diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java index d8d41d5d53..e56ad7f161 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java @@ -16,11 +16,13 @@ * limitations under the License. */ package org.apache.hadoop.ozone.om.helpers; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.Auditable; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -102,6 +104,14 @@ public Map toAuditMap() { return auditMap; } + @VisibleForTesting + public void addLocationInfo(OmKeyLocationInfo locationInfo) { + if (this.locationInfoList == null) { + locationInfoList = new ArrayList<>(); + } + locationInfoList.add(locationInfo); + } + /** * Builder class of OmKeyArgs. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java index ee23fe06fa..41a876bd78 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java @@ -1,52 +1,54 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.hadoop.ozone.om; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; -import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.BackgroundService; import org.apache.hadoop.utils.BackgroundTask; import org.apache.hadoop.utils.BackgroundTaskQueue; import org.apache.hadoop.utils.BackgroundTaskResult; import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.utils.db.Table; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; /** - * This is the background service to delete keys. - * Scan the metadata of om periodically to get - * the keys with prefix "#deleting" and ask scm to - * delete metadata accordingly, if scm returns - * success for keys, then clean up those keys. + * This is the background service to delete keys. Scan the metadata of om + * periodically to get the keys from DeletedTable and ask scm to delete + * metadata accordingly, if scm returns success for keys, then clean up those + * keys. */ public class KeyDeletingService extends BackgroundService { - private static final Logger LOG = LoggerFactory.getLogger(KeyDeletingService.class); @@ -56,6 +58,8 @@ public class KeyDeletingService extends BackgroundService { private final ScmBlockLocationProtocol scmClient; private final KeyManager manager; private final int keyLimitPerTask; + private final AtomicLong deletedKeyCount; + private final AtomicLong runCount; public KeyDeletingService(ScmBlockLocationProtocol scmClient, KeyManager manager, long serviceInterval, @@ -66,6 +70,28 @@ public KeyDeletingService(ScmBlockLocationProtocol scmClient, this.manager = manager; this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); + this.deletedKeyCount = new AtomicLong(0); + this.runCount = new AtomicLong(0); + } + + /** + * Returns the number of times this Background service has run. + * + * @return Long, run count. + */ + @VisibleForTesting + public AtomicLong getRunCount() { + return runCount; + } + + /** + * Returns the number of keys deleted by the background service. + * + * @return Long count. + */ + @VisibleForTesting + public AtomicLong getDeletedKeyCount() { + return deletedKeyCount; } @Override @@ -76,11 +102,11 @@ public BackgroundTaskQueue getTasks() { } /** - * A key deleting task scans OM DB and looking for a certain number - * of pending-deletion keys, sends these keys along with their associated - * blocks to SCM for deletion. Once SCM confirms keys are deleted (once - * SCM persisted the blocks info in its deletedBlockLog), it removes - * these keys from the DB. + * A key deleting task scans OM DB and looking for a certain number of + * pending-deletion keys, sends these keys along with their associated blocks + * to SCM for deletion. Once SCM confirms keys are deleted (once SCM persisted + * the blocks info in its deletedBlockLog), it removes these keys from the + * DB. */ private class KeyDeletingTask implements BackgroundTask { @@ -92,51 +118,55 @@ public int getPriority() { @Override public BackgroundTaskResult call() throws Exception { + runCount.incrementAndGet(); try { long startTime = Time.monotonicNow(); List keyBlocksList = manager .getPendingDeletionKeys(keyLimitPerTask); - if (keyBlocksList.size() > 0) { - LOG.info("Found {} to-delete keys in OM", keyBlocksList.size()); + if (keyBlocksList != null && keyBlocksList.size() > 0) { List results = scmClient.deleteKeyBlocks(keyBlocksList); - for (DeleteBlockGroupResult result : results) { - if (result.isSuccess()) { - try { - // Purge key from OM DB. - manager.deletePendingDeletionKey(result.getObjectKey()); - LOG.debug("Key {} deleted from OM DB", result.getObjectKey()); - } catch (IOException e) { - // if a pending deletion key is failed to delete, - // print a warning here and retain it in this state, - // so that it can be attempt to delete next time. - LOG.warn("Failed to delete pending-deletion key {}", - result.getObjectKey(), e); - } - } else { - // Key deletion failed, retry in next interval. - LOG.warn("Key {} deletion failed because some of the blocks" - + " were failed to delete, failed blocks: {}", - result.getObjectKey(), - StringUtils.join(",", result.getFailedBlocks())); - } + if (results != null) { + int delCount = deleteAllKeys(results); + LOG.debug("Number of keys deleted: {}, elapsed time: {}ms", + delCount, Time.monotonicNow() - startTime); + deletedKeyCount.addAndGet(delCount); } - - if (!results.isEmpty()) { - LOG.info("Number of key deleted from OM DB: {}," - + " task elapsed time: {}ms", - results.size(), Time.monotonicNow() - startTime); - } - - return results::size; - } else { - LOG.debug("No pending deletion key found in OM"); } } catch (IOException e) { - LOG.error("Unable to get pending deletion keys, retry in" - + " next interval", e); + LOG.error("Error while running delete keys background task. Will " + + "retry at next run.", e); } + // By desing, no one cares about the results of this call back. return EmptyTaskResult.newResult(); } + + /** + * Deletes all the keys that SCM has acknowledged and queued for delete. + * + * @param results DeleteBlockGroups returned by SCM. + * @throws RocksDBException on Error. + * @throws IOException on Error + */ + private int deleteAllKeys(List results) + throws RocksDBException, IOException { + Table deletedTable = manager.getMetadataManager().getDeletedTable(); + // Put all keys to delete in a single transaction and call for delete. + int deletedCount = 0; + try (WriteBatch writeBatch = new WriteBatch()) { + for (DeleteBlockGroupResult result : results) { + if (result.isSuccess()) { + // Purge key from OM DB. + writeBatch.delete(deletedTable.getHandle(), + DFSUtil.string2Bytes(result.getObjectKey())); + LOG.debug("Key {} deleted from OM DB", result.getObjectKey()); + deletedCount++; + } + } + // Write a single transaction for delete. + manager.getMetadataManager().getStore().write(writeBatch); + } + return deletedCount; + } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index a512d7be00..83363e72ad 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.utils.BackgroundService; import java.io.IOException; import java.util.List; @@ -143,16 +144,6 @@ List listKeys(String volumeName, */ List getPendingDeletionKeys(int count) throws IOException; - /** - * Deletes a pending deletion key by its name. This is often called when - * key can be safely deleted from this layer. Once called, all footprints - * of the key will be purged from OM DB. - * - * @param objectKeyName object key name with #deleting# prefix. - * @throws IOException if specified key doesn't exist or other I/O errors. - */ - void deletePendingDeletionKey(String objectKeyName) throws IOException; - /** * Returns a list of all still open key info. Which contains the info about * the key name and all its associated block IDs. A pending open key has @@ -172,4 +163,17 @@ List listKeys(String volumeName, * @throws IOException if specified key doesn't exist or other I/O errors. */ void deleteExpiredOpenKey(String objectKeyName) throws IOException; + + /** + * Returns the metadataManager. + * @return OMMetadataManager. + */ + OMMetadataManager getMetadataManager(); + + /** + * Returns the instance of Deleting Service. + * @return Background service. + */ + BackgroundService getDeletingService(); + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index d5855235a6..06d25878a5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BackgroundService; import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; import org.slf4j.Logger; @@ -43,9 +44,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; @@ -69,6 +75,8 @@ public class KeyManagerImpl implements KeyManager { private final long preallocateMax; private final String omId; + private final BackgroundService keyDeletingService; + public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, OMMetadataManager metadataManager, OzoneConfiguration conf, @@ -82,15 +90,28 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, this.preallocateMax = conf.getLong( OZONE_KEY_PREALLOCATION_MAXSIZE, OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT); + long blockDeleteInterval = conf.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = conf.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + keyDeletingService = new KeyDeletingService( + scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf); + this.omId = omId; } @Override public void start() { + keyDeletingService.start(); } @Override public void stop() throws IOException { + keyDeletingService.shutdown(); } private void validateBucket(String volumeName, String bucketName) @@ -460,14 +481,7 @@ public List listKeys(String volumeName, String bucketName, @Override public List getPendingDeletionKeys(final int count) throws IOException { - //TODO: Fix this in later patches. - return null; - } - - @Override - public void deletePendingDeletionKey(String objectKeyName) - throws IOException { - // TODO : Fix in later patches. + return metadataManager.getPendingDeletionKeys(count); } @Override @@ -485,4 +499,14 @@ public void deleteExpiredOpenKey(String objectKeyName) throws IOException { Preconditions.checkNotNull(objectKeyName); // TODO: Fix this in later patches. } + + @Override + public OMMetadataManager getMetadataManager() { + return metadataManager; + } + + @Override + public BackgroundService getDeletingService() { + return keyDeletingService; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 151fddf1cc..16625dc064 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -273,7 +273,7 @@ public byte[] getOzoneKeyBytes(String volume, String bucket, String key) { @Override public byte[] getOpenKeyBytes(String volume, String bucket, - String key, long id) { + String key, long id) { String openKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket + OM_KEY_PREFIX + key + OM_KEY_PREFIX + id; return DFSUtil.string2Bytes(openKey); @@ -573,27 +573,37 @@ private VolumeList getVolumesByUser(byte[] userNameKey) } @Override - public List getPendingDeletionKeys(final int count) + public List getPendingDeletionKeys(final int keyCount) throws IOException { List keyBlocksList = Lists.newArrayList(); - // TODO: Fix this later, Not part of this patch. - List> rangeResult = Collections.emptyList(); - for (Map.Entry entry : rangeResult) { - OmKeyInfo info = - OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); - // Get block keys as a list. - OmKeyLocationInfoGroup latest = info.getLatestVersionLocations(); - if (latest == null) { - return Collections.emptyList(); + try (TableIterator keyIter = getDeletedTable().iterator()) { + int currentCount = 0; + while (keyIter.hasNext() && currentCount < keyCount) { + Table.KeyValue kv = keyIter.next(); + if (kv != null) { + OmKeyInfo info = + OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(kv.getValue())); + // Get block keys as a list. + OmKeyLocationInfoGroup latest = info.getLatestVersionLocations(); + if (latest == null) { + // This means that we have a key without any blocks. + // BUG-BUG: if this happens the key will never be deleted. + // TODO: Right thing to do is to remove this key right here. + LOG.warn("Found a key without blocks: {}, skipping for now.", + DFSUtil.bytes2String(kv.getKey())); + continue; + } + List item = latest.getLocationList().stream() + .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) + .collect(Collectors.toList()); + BlockGroup keyBlocks = BlockGroup.newBuilder() + .setKeyName(DFSUtil.bytes2String(kv.getKey())) + .addAllBlockIDs(item) + .build(); + keyBlocksList.add(keyBlocks); + currentCount++; + } } - List item = latest.getLocationList().stream() - .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) - .collect(Collectors.toList()); - BlockGroup keyBlocks = BlockGroup.newBuilder() - .setKeyName(DFSUtil.bytes2String(entry.getKey())) - .addAllBlockIDs(item) - .build(); - keyBlocksList.add(keyBlocks); } return keyBlocksList; } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java new file mode 100644 index 0000000000..2da60ded67 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.apache.hadoop.hdds.protocol.proto + .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result; +import static org.apache.hadoop.hdds.protocol.proto + .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result.success; +import static org.apache.hadoop.hdds.protocol.proto + .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result.unknownFailure; + +/** + * This is a testing client that allows us to intercept calls from OzoneManager + * to SCM. + *

+ * TODO: OzoneManager#getScmBlockClient -- so that we can load this class up via + * config setting into OzoneManager. Right now, we just pass this to + * KeyDeletingService only. + *

+ * TODO: Move this class to a generic test utils so we can use this class in + * other Ozone Manager tests. + */ +public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol { + private static final Logger LOG = + LoggerFactory.getLogger(ScmBlockLocationTestIngClient.class); + private final String clusterID; + private final String scmId; + + // 0 means no calls will fail, +1 means all calls will fail, +2 means every + // second call will fail, +3 means every third and so on. + private final int failCallsFrequency; + private int currentCall = 0; + + /** + * If ClusterID or SCMID is blank a per instance ID is generated. + * + * @param clusterID - String or blank. + * @param scmId - String or Blank. + * @param failCallsFrequency - Set to 0 for no failures, 1 for always to fail, + * a positive number for that frequency of failure. + */ + public ScmBlockLocationTestIngClient(String clusterID, String scmId, + int failCallsFrequency) { + this.clusterID = StringUtils.isNotBlank(clusterID) ? clusterID : + UUID.randomUUID().toString(); + this.scmId = StringUtils.isNotBlank(scmId) ? scmId : + UUID.randomUUID().toString(); + this.failCallsFrequency = Math.abs(failCallsFrequency); + switch (this.failCallsFrequency) { + case 0: + LOG.debug("Set to no failure mode, all delete block calls will " + + "succeed."); + break; + case 1: + LOG.debug("Set to all failure mode. All delete block calls to SCM" + + " will fail."); + break; + default: + LOG.debug("Set to Mix mode, every {} -th call will fail", + this.failCallsFrequency); + } + + } + + /** + * Returns Fake blocks to the KeyManager so we get blocks in the Database. + * @param size - size of the block. + * @param type Replication Type + * @param factor - Replication factor + * @param owner - String owner. + * @return + * @throws IOException + */ + @Override + public AllocatedBlock allocateBlock(long size, + HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, + String owner) throws IOException { + DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); + Pipeline pipeline = createPipeline(datanodeDetails); + long containerID = Time.monotonicNow(); + long localID = Time.monotonicNow(); + AllocatedBlock.Builder abb = + new AllocatedBlock.Builder() + .setBlockID(new BlockID(containerID, localID)) + .setPipeline(pipeline) + .setShouldCreateContainer(false); + return abb.build(); + } + + private Pipeline createPipeline(DatanodeDetails datanode) { + final Pipeline pipeline = + new Pipeline(datanode.getUuidString(), HddsProtos.LifeCycleState.OPEN, + HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.ONE, + PipelineID.randomId()); + pipeline.addMember(datanode); + return pipeline; + } + + @Override + public List deleteKeyBlocks( + List keyBlocksInfoList) throws IOException { + List results = new ArrayList<>(); + List blockResultList = new ArrayList<>(); + Result result; + for (BlockGroup keyBlocks : keyBlocksInfoList) { + for (BlockID blockKey : keyBlocks.getBlockIDList()) { + currentCall++; + switch (this.failCallsFrequency) { + case 0: + result = success; + break; + case 1: + result = unknownFailure; + break; + default: + if (currentCall % this.failCallsFrequency == 0) { + result = unknownFailure; + } else { + result = success; + } + } + blockResultList.add(new DeleteBlockResult(blockKey, result)); + } + results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(), + blockResultList)); + } + return results; + } + + @Override + public ScmInfo getScmInfo() throws IOException { + ScmInfo.Builder builder = + new ScmInfo.Builder() + .setClusterId(clusterID) + .setScmId(scmId); + return builder.build(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java new file mode 100644 index 0000000000..44e3bdf82b --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.utils.db.DBConfigFromFile; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.hdds.HddsConfigKeys + .HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; + +/** + * Test Key Deleting Service. + *

+ * This test does the following things. + *

+ * 1. Creates a bunch of keys. 2. Then executes delete key directly using + * Metadata Manager. 3. Waits for a while for the KeyDeleting Service to pick up + * and call into SCM. 4. Confirms that calls have been successful. + */ +public class TestKeyDeletingService { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private OzoneConfiguration createConfAndInitValues() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + File newFolder = folder.newFolder(); + if (!newFolder.exists()) { + Assert.assertTrue(newFolder.mkdirs()); + } + System.setProperty(DBConfigFromFile.CONFIG_DIR, "/"); + ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString()); + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setQuietMode(false); + + return conf; + } + + /** + * In this test, we create a bunch of keys and delete them. Then we start the + * KeyDeletingService and pass a SCMClient which does not fail. We make sure + * that all the keys that we deleted is picked up and deleted by + * OzoneManager. + * + * @throws IOException - on Failure. + */ + + @Test(timeout = 30000) + public void checkIfDeleteServiceisDeletingKeys() + throws IOException, TimeoutException, InterruptedException { + OzoneConfiguration conf = createConfAndInitValues(); + OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf); + KeyManager keyManager = + new KeyManagerImpl( + new ScmBlockLocationTestIngClient(null, null, 0), + metaMgr, conf, UUID.randomUUID().toString()); + final int keyCount = 100; + createAndDeleteKeys(keyManager, keyCount); + KeyDeletingService keyDeletingService = + (KeyDeletingService) keyManager.getDeletingService(); + keyManager.start(); + GenericTestUtils.waitFor( + () -> keyDeletingService.getDeletedKeyCount().get() >= keyCount, + 1000, 10000); + Assert.assertTrue(keyDeletingService.getRunCount().get() > 1); + } + + @Test(timeout = 30000) + public void checkIfDeleteServiceWithFailingSCM() + throws IOException, TimeoutException, InterruptedException { + OzoneConfiguration conf = createConfAndInitValues(); + OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf); + //failCallsFrequency = 1 , means all calls fail. + KeyManager keyManager = + new KeyManagerImpl( + new ScmBlockLocationTestIngClient(null, null, 1), + metaMgr, conf, UUID.randomUUID().toString()); + final int keyCount = 100; + createAndDeleteKeys(keyManager, keyCount); + KeyDeletingService keyDeletingService = + (KeyDeletingService) keyManager.getDeletingService(); + keyManager.start(); + // Make sure that we have run the background thread 5 times more + GenericTestUtils.waitFor( + () -> keyDeletingService.getRunCount().get() >= 5, + 100, 1000); + // Since SCM calls are failing, deletedKeyCount should be zero. + Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0); + + + } + + private void createAndDeleteKeys(KeyManager keyManager, int keyCount) + throws IOException { + for (int x = 0; x < keyCount; x++) { + String volumeName = String.format("volume%s", + RandomStringUtils.randomAlphanumeric(5)); + String bucketName = String.format("bucket%s", + RandomStringUtils.randomAlphanumeric(5)); + String keyName = String.format("key%s", + RandomStringUtils.randomAlphanumeric(5)); + byte[] volumeBytes = + keyManager.getMetadataManager().getVolumeKey(volumeName); + byte[] bucketBytes = + keyManager.getMetadataManager().getBucketKey(volumeName, bucketName); + // cheat here, just create a volume and bucket entry so that we can + // create the keys, we put the same data for key and value since the + // system does not decode the object + keyManager.getMetadataManager().getVolumeTable().put(volumeBytes, + volumeBytes); + + keyManager.getMetadataManager().getBucketTable().put(bucketBytes, + bucketBytes); + + OmKeyArgs arg = + new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .build(); + //Open, Commit and Delete the Keys in the Key Manager. + OpenKeySession session = keyManager.openKey(arg); + arg.addLocationInfo(keyManager.allocateBlock(arg, session.getId())); + keyManager.commitKey(arg, session.getId()); + keyManager.deleteKey(arg); + } + } +} \ No newline at end of file