HDDS-358. Use DBStore and TableStore for DeleteKeyService. Contributed by Anu Engineer
This commit is contained in:
parent
dffb7bfe6c
commit
df0d61e3a0
@ -16,11 +16,13 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om.helpers;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.audit.Auditable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -102,6 +104,14 @@ public Map<String, String> toAuditMap() {
|
||||
return auditMap;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void addLocationInfo(OmKeyLocationInfo locationInfo) {
|
||||
if (this.locationInfoList == null) {
|
||||
locationInfoList = new ArrayList<>();
|
||||
}
|
||||
locationInfoList.add(locationInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class of OmKeyArgs.
|
||||
*/
|
||||
|
@ -1,52 +1,54 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
import org.apache.hadoop.utils.BackgroundTask;
|
||||
import org.apache.hadoop.utils.BackgroundTaskQueue;
|
||||
import org.apache.hadoop.utils.BackgroundTaskResult;
|
||||
import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
|
||||
import org.apache.hadoop.utils.db.Table;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.WriteBatch;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
|
||||
|
||||
/**
|
||||
* This is the background service to delete keys.
|
||||
* Scan the metadata of om periodically to get
|
||||
* the keys with prefix "#deleting" and ask scm to
|
||||
* delete metadata accordingly, if scm returns
|
||||
* success for keys, then clean up those keys.
|
||||
* This is the background service to delete keys. Scan the metadata of om
|
||||
* periodically to get the keys from DeletedTable and ask scm to delete
|
||||
* metadata accordingly, if scm returns success for keys, then clean up those
|
||||
* keys.
|
||||
*/
|
||||
public class KeyDeletingService extends BackgroundService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(KeyDeletingService.class);
|
||||
|
||||
@ -56,6 +58,8 @@ public class KeyDeletingService extends BackgroundService {
|
||||
private final ScmBlockLocationProtocol scmClient;
|
||||
private final KeyManager manager;
|
||||
private final int keyLimitPerTask;
|
||||
private final AtomicLong deletedKeyCount;
|
||||
private final AtomicLong runCount;
|
||||
|
||||
public KeyDeletingService(ScmBlockLocationProtocol scmClient,
|
||||
KeyManager manager, long serviceInterval,
|
||||
@ -66,6 +70,28 @@ public KeyDeletingService(ScmBlockLocationProtocol scmClient,
|
||||
this.manager = manager;
|
||||
this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
|
||||
OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
|
||||
this.deletedKeyCount = new AtomicLong(0);
|
||||
this.runCount = new AtomicLong(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of times this Background service has run.
|
||||
*
|
||||
* @return Long, run count.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public AtomicLong getRunCount() {
|
||||
return runCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of keys deleted by the background service.
|
||||
*
|
||||
* @return Long count.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public AtomicLong getDeletedKeyCount() {
|
||||
return deletedKeyCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -76,11 +102,11 @@ public BackgroundTaskQueue getTasks() {
|
||||
}
|
||||
|
||||
/**
|
||||
* A key deleting task scans OM DB and looking for a certain number
|
||||
* of pending-deletion keys, sends these keys along with their associated
|
||||
* blocks to SCM for deletion. Once SCM confirms keys are deleted (once
|
||||
* SCM persisted the blocks info in its deletedBlockLog), it removes
|
||||
* these keys from the DB.
|
||||
* A key deleting task scans OM DB and looking for a certain number of
|
||||
* pending-deletion keys, sends these keys along with their associated blocks
|
||||
* to SCM for deletion. Once SCM confirms keys are deleted (once SCM persisted
|
||||
* the blocks info in its deletedBlockLog), it removes these keys from the
|
||||
* DB.
|
||||
*/
|
||||
private class KeyDeletingTask implements
|
||||
BackgroundTask<BackgroundTaskResult> {
|
||||
@ -92,51 +118,55 @@ public int getPriority() {
|
||||
|
||||
@Override
|
||||
public BackgroundTaskResult call() throws Exception {
|
||||
runCount.incrementAndGet();
|
||||
try {
|
||||
long startTime = Time.monotonicNow();
|
||||
List<BlockGroup> keyBlocksList = manager
|
||||
.getPendingDeletionKeys(keyLimitPerTask);
|
||||
if (keyBlocksList.size() > 0) {
|
||||
LOG.info("Found {} to-delete keys in OM", keyBlocksList.size());
|
||||
if (keyBlocksList != null && keyBlocksList.size() > 0) {
|
||||
List<DeleteBlockGroupResult> results =
|
||||
scmClient.deleteKeyBlocks(keyBlocksList);
|
||||
if (results != null) {
|
||||
int delCount = deleteAllKeys(results);
|
||||
LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
|
||||
delCount, Time.monotonicNow() - startTime);
|
||||
deletedKeyCount.addAndGet(delCount);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error while running delete keys background task. Will " +
|
||||
"retry at next run.", e);
|
||||
}
|
||||
// By desing, no one cares about the results of this call back.
|
||||
return EmptyTaskResult.newResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes all the keys that SCM has acknowledged and queued for delete.
|
||||
*
|
||||
* @param results DeleteBlockGroups returned by SCM.
|
||||
* @throws RocksDBException on Error.
|
||||
* @throws IOException on Error
|
||||
*/
|
||||
private int deleteAllKeys(List<DeleteBlockGroupResult> results)
|
||||
throws RocksDBException, IOException {
|
||||
Table deletedTable = manager.getMetadataManager().getDeletedTable();
|
||||
// Put all keys to delete in a single transaction and call for delete.
|
||||
int deletedCount = 0;
|
||||
try (WriteBatch writeBatch = new WriteBatch()) {
|
||||
for (DeleteBlockGroupResult result : results) {
|
||||
if (result.isSuccess()) {
|
||||
try {
|
||||
// Purge key from OM DB.
|
||||
manager.deletePendingDeletionKey(result.getObjectKey());
|
||||
writeBatch.delete(deletedTable.getHandle(),
|
||||
DFSUtil.string2Bytes(result.getObjectKey()));
|
||||
LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
|
||||
} catch (IOException e) {
|
||||
// if a pending deletion key is failed to delete,
|
||||
// print a warning here and retain it in this state,
|
||||
// so that it can be attempt to delete next time.
|
||||
LOG.warn("Failed to delete pending-deletion key {}",
|
||||
result.getObjectKey(), e);
|
||||
}
|
||||
} else {
|
||||
// Key deletion failed, retry in next interval.
|
||||
LOG.warn("Key {} deletion failed because some of the blocks"
|
||||
+ " were failed to delete, failed blocks: {}",
|
||||
result.getObjectKey(),
|
||||
StringUtils.join(",", result.getFailedBlocks()));
|
||||
deletedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (!results.isEmpty()) {
|
||||
LOG.info("Number of key deleted from OM DB: {},"
|
||||
+ " task elapsed time: {}ms",
|
||||
results.size(), Time.monotonicNow() - startTime);
|
||||
// Write a single transaction for delete.
|
||||
manager.getMetadataManager().getStore().write(writeBatch);
|
||||
}
|
||||
|
||||
return results::size;
|
||||
} else {
|
||||
LOG.debug("No pending deletion key found in OM");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to get pending deletion keys, retry in"
|
||||
+ " next interval", e);
|
||||
}
|
||||
return EmptyTaskResult.newResult();
|
||||
return deletedCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@ -143,16 +144,6 @@ List<OmKeyInfo> listKeys(String volumeName,
|
||||
*/
|
||||
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes a pending deletion key by its name. This is often called when
|
||||
* key can be safely deleted from this layer. Once called, all footprints
|
||||
* of the key will be purged from OM DB.
|
||||
*
|
||||
* @param objectKeyName object key name with #deleting# prefix.
|
||||
* @throws IOException if specified key doesn't exist or other I/O errors.
|
||||
*/
|
||||
void deletePendingDeletionKey(String objectKeyName) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns a list of all still open key info. Which contains the info about
|
||||
* the key name and all its associated block IDs. A pending open key has
|
||||
@ -172,4 +163,17 @@ List<OmKeyInfo> listKeys(String volumeName,
|
||||
* @throws IOException if specified key doesn't exist or other I/O errors.
|
||||
*/
|
||||
void deleteExpiredOpenKey(String objectKeyName) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the metadataManager.
|
||||
* @return OMMetadataManager.
|
||||
*/
|
||||
OMMetadataManager getMetadataManager();
|
||||
|
||||
/**
|
||||
* Returns the instance of Deleting Service.
|
||||
* @return Background service.
|
||||
*/
|
||||
BackgroundService getDeletingService();
|
||||
|
||||
}
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.WriteBatch;
|
||||
import org.slf4j.Logger;
|
||||
@ -43,9 +44,14 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
|
||||
@ -69,6 +75,8 @@ public class KeyManagerImpl implements KeyManager {
|
||||
private final long preallocateMax;
|
||||
private final String omId;
|
||||
|
||||
private final BackgroundService keyDeletingService;
|
||||
|
||||
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
|
||||
OMMetadataManager metadataManager,
|
||||
OzoneConfiguration conf,
|
||||
@ -82,15 +90,28 @@ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
|
||||
this.preallocateMax = conf.getLong(
|
||||
OZONE_KEY_PREALLOCATION_MAXSIZE,
|
||||
OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
|
||||
long blockDeleteInterval = conf.getTimeDuration(
|
||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
|
||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
long serviceTimeout = conf.getTimeDuration(
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
keyDeletingService = new KeyDeletingService(
|
||||
scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf);
|
||||
|
||||
this.omId = omId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
keyDeletingService.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws IOException {
|
||||
keyDeletingService.shutdown();
|
||||
}
|
||||
|
||||
private void validateBucket(String volumeName, String bucketName)
|
||||
@ -460,14 +481,7 @@ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
|
||||
@Override
|
||||
public List<BlockGroup> getPendingDeletionKeys(final int count)
|
||||
throws IOException {
|
||||
//TODO: Fix this in later patches.
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deletePendingDeletionKey(String objectKeyName)
|
||||
throws IOException {
|
||||
// TODO : Fix in later patches.
|
||||
return metadataManager.getPendingDeletionKeys(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -485,4 +499,14 @@ public void deleteExpiredOpenKey(String objectKeyName) throws IOException {
|
||||
Preconditions.checkNotNull(objectKeyName);
|
||||
// TODO: Fix this in later patches.
|
||||
}
|
||||
|
||||
@Override
|
||||
public OMMetadataManager getMetadataManager() {
|
||||
return metadataManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BackgroundService getDeletingService() {
|
||||
return keyDeletingService;
|
||||
}
|
||||
}
|
||||
|
@ -573,27 +573,37 @@ private VolumeList getVolumesByUser(byte[] userNameKey)
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BlockGroup> getPendingDeletionKeys(final int count)
|
||||
public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
|
||||
throws IOException {
|
||||
List<BlockGroup> keyBlocksList = Lists.newArrayList();
|
||||
// TODO: Fix this later, Not part of this patch.
|
||||
List<Map.Entry<byte[], byte[]>> rangeResult = Collections.emptyList();
|
||||
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
|
||||
try (TableIterator<Table.KeyValue> keyIter = getDeletedTable().iterator()) {
|
||||
int currentCount = 0;
|
||||
while (keyIter.hasNext() && currentCount < keyCount) {
|
||||
Table.KeyValue kv = keyIter.next();
|
||||
if (kv != null) {
|
||||
OmKeyInfo info =
|
||||
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
|
||||
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(kv.getValue()));
|
||||
// Get block keys as a list.
|
||||
OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
|
||||
if (latest == null) {
|
||||
return Collections.emptyList();
|
||||
// This means that we have a key without any blocks.
|
||||
// BUG-BUG: if this happens the key will never be deleted.
|
||||
// TODO: Right thing to do is to remove this key right here.
|
||||
LOG.warn("Found a key without blocks: {}, skipping for now.",
|
||||
DFSUtil.bytes2String(kv.getKey()));
|
||||
continue;
|
||||
}
|
||||
List<BlockID> item = latest.getLocationList().stream()
|
||||
.map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
|
||||
.collect(Collectors.toList());
|
||||
BlockGroup keyBlocks = BlockGroup.newBuilder()
|
||||
.setKeyName(DFSUtil.bytes2String(entry.getKey()))
|
||||
.setKeyName(DFSUtil.bytes2String(kv.getKey()))
|
||||
.addAllBlockIDs(item)
|
||||
.build();
|
||||
keyBlocksList.add(keyBlocks);
|
||||
currentCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return keyBlocksList;
|
||||
}
|
||||
|
@ -0,0 +1,178 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.hadoop.hdds.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
|
||||
import static org.apache.hadoop.hdds.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result.success;
|
||||
import static org.apache.hadoop.hdds.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result.unknownFailure;
|
||||
|
||||
/**
|
||||
* This is a testing client that allows us to intercept calls from OzoneManager
|
||||
* to SCM.
|
||||
* <p>
|
||||
* TODO: OzoneManager#getScmBlockClient -- so that we can load this class up via
|
||||
* config setting into OzoneManager. Right now, we just pass this to
|
||||
* KeyDeletingService only.
|
||||
* <p>
|
||||
* TODO: Move this class to a generic test utils so we can use this class in
|
||||
* other Ozone Manager tests.
|
||||
*/
|
||||
public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ScmBlockLocationTestIngClient.class);
|
||||
private final String clusterID;
|
||||
private final String scmId;
|
||||
|
||||
// 0 means no calls will fail, +1 means all calls will fail, +2 means every
|
||||
// second call will fail, +3 means every third and so on.
|
||||
private final int failCallsFrequency;
|
||||
private int currentCall = 0;
|
||||
|
||||
/**
|
||||
* If ClusterID or SCMID is blank a per instance ID is generated.
|
||||
*
|
||||
* @param clusterID - String or blank.
|
||||
* @param scmId - String or Blank.
|
||||
* @param failCallsFrequency - Set to 0 for no failures, 1 for always to fail,
|
||||
* a positive number for that frequency of failure.
|
||||
*/
|
||||
public ScmBlockLocationTestIngClient(String clusterID, String scmId,
|
||||
int failCallsFrequency) {
|
||||
this.clusterID = StringUtils.isNotBlank(clusterID) ? clusterID :
|
||||
UUID.randomUUID().toString();
|
||||
this.scmId = StringUtils.isNotBlank(scmId) ? scmId :
|
||||
UUID.randomUUID().toString();
|
||||
this.failCallsFrequency = Math.abs(failCallsFrequency);
|
||||
switch (this.failCallsFrequency) {
|
||||
case 0:
|
||||
LOG.debug("Set to no failure mode, all delete block calls will " +
|
||||
"succeed.");
|
||||
break;
|
||||
case 1:
|
||||
LOG.debug("Set to all failure mode. All delete block calls to SCM" +
|
||||
" will fail.");
|
||||
break;
|
||||
default:
|
||||
LOG.debug("Set to Mix mode, every {} -th call will fail",
|
||||
this.failCallsFrequency);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns Fake blocks to the KeyManager so we get blocks in the Database.
|
||||
* @param size - size of the block.
|
||||
* @param type Replication Type
|
||||
* @param factor - Replication factor
|
||||
* @param owner - String owner.
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public AllocatedBlock allocateBlock(long size,
|
||||
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
|
||||
String owner) throws IOException {
|
||||
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
|
||||
Pipeline pipeline = createPipeline(datanodeDetails);
|
||||
long containerID = Time.monotonicNow();
|
||||
long localID = Time.monotonicNow();
|
||||
AllocatedBlock.Builder abb =
|
||||
new AllocatedBlock.Builder()
|
||||
.setBlockID(new BlockID(containerID, localID))
|
||||
.setPipeline(pipeline)
|
||||
.setShouldCreateContainer(false);
|
||||
return abb.build();
|
||||
}
|
||||
|
||||
private Pipeline createPipeline(DatanodeDetails datanode) {
|
||||
final Pipeline pipeline =
|
||||
new Pipeline(datanode.getUuidString(), HddsProtos.LifeCycleState.OPEN,
|
||||
HddsProtos.ReplicationType.STAND_ALONE,
|
||||
HddsProtos.ReplicationFactor.ONE,
|
||||
PipelineID.randomId());
|
||||
pipeline.addMember(datanode);
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DeleteBlockGroupResult> deleteKeyBlocks(
|
||||
List<BlockGroup> keyBlocksInfoList) throws IOException {
|
||||
List<DeleteBlockGroupResult> results = new ArrayList<>();
|
||||
List<DeleteBlockResult> blockResultList = new ArrayList<>();
|
||||
Result result;
|
||||
for (BlockGroup keyBlocks : keyBlocksInfoList) {
|
||||
for (BlockID blockKey : keyBlocks.getBlockIDList()) {
|
||||
currentCall++;
|
||||
switch (this.failCallsFrequency) {
|
||||
case 0:
|
||||
result = success;
|
||||
break;
|
||||
case 1:
|
||||
result = unknownFailure;
|
||||
break;
|
||||
default:
|
||||
if (currentCall % this.failCallsFrequency == 0) {
|
||||
result = unknownFailure;
|
||||
} else {
|
||||
result = success;
|
||||
}
|
||||
}
|
||||
blockResultList.add(new DeleteBlockResult(blockKey, result));
|
||||
}
|
||||
results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
|
||||
blockResultList));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScmInfo getScmInfo() throws IOException {
|
||||
ScmInfo.Builder builder =
|
||||
new ScmInfo.Builder()
|
||||
.setClusterId(clusterID)
|
||||
.setScmId(scmId);
|
||||
return builder.build();
|
||||
}
|
||||
}
|
@ -0,0 +1,164 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.server.ServerUtils;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.utils.db.DBConfigFromFile;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||
.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
||||
|
||||
/**
|
||||
* Test Key Deleting Service.
|
||||
* <p>
|
||||
* This test does the following things.
|
||||
* <p>
|
||||
* 1. Creates a bunch of keys. 2. Then executes delete key directly using
|
||||
* Metadata Manager. 3. Waits for a while for the KeyDeleting Service to pick up
|
||||
* and call into SCM. 4. Confirms that calls have been successful.
|
||||
*/
|
||||
public class TestKeyDeletingService {
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
|
||||
private OzoneConfiguration createConfAndInitValues() throws IOException {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
File newFolder = folder.newFolder();
|
||||
if (!newFolder.exists()) {
|
||||
Assert.assertTrue(newFolder.mkdirs());
|
||||
}
|
||||
System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
|
||||
ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
|
||||
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
|
||||
TimeUnit.MILLISECONDS);
|
||||
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
|
||||
TimeUnit.MILLISECONDS);
|
||||
conf.setQuietMode(false);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, we create a bunch of keys and delete them. Then we start the
|
||||
* KeyDeletingService and pass a SCMClient which does not fail. We make sure
|
||||
* that all the keys that we deleted is picked up and deleted by
|
||||
* OzoneManager.
|
||||
*
|
||||
* @throws IOException - on Failure.
|
||||
*/
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void checkIfDeleteServiceisDeletingKeys()
|
||||
throws IOException, TimeoutException, InterruptedException {
|
||||
OzoneConfiguration conf = createConfAndInitValues();
|
||||
OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
|
||||
KeyManager keyManager =
|
||||
new KeyManagerImpl(
|
||||
new ScmBlockLocationTestIngClient(null, null, 0),
|
||||
metaMgr, conf, UUID.randomUUID().toString());
|
||||
final int keyCount = 100;
|
||||
createAndDeleteKeys(keyManager, keyCount);
|
||||
KeyDeletingService keyDeletingService =
|
||||
(KeyDeletingService) keyManager.getDeletingService();
|
||||
keyManager.start();
|
||||
GenericTestUtils.waitFor(
|
||||
() -> keyDeletingService.getDeletedKeyCount().get() >= keyCount,
|
||||
1000, 10000);
|
||||
Assert.assertTrue(keyDeletingService.getRunCount().get() > 1);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void checkIfDeleteServiceWithFailingSCM()
|
||||
throws IOException, TimeoutException, InterruptedException {
|
||||
OzoneConfiguration conf = createConfAndInitValues();
|
||||
OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
|
||||
//failCallsFrequency = 1 , means all calls fail.
|
||||
KeyManager keyManager =
|
||||
new KeyManagerImpl(
|
||||
new ScmBlockLocationTestIngClient(null, null, 1),
|
||||
metaMgr, conf, UUID.randomUUID().toString());
|
||||
final int keyCount = 100;
|
||||
createAndDeleteKeys(keyManager, keyCount);
|
||||
KeyDeletingService keyDeletingService =
|
||||
(KeyDeletingService) keyManager.getDeletingService();
|
||||
keyManager.start();
|
||||
// Make sure that we have run the background thread 5 times more
|
||||
GenericTestUtils.waitFor(
|
||||
() -> keyDeletingService.getRunCount().get() >= 5,
|
||||
100, 1000);
|
||||
// Since SCM calls are failing, deletedKeyCount should be zero.
|
||||
Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0);
|
||||
|
||||
|
||||
}
|
||||
|
||||
private void createAndDeleteKeys(KeyManager keyManager, int keyCount)
|
||||
throws IOException {
|
||||
for (int x = 0; x < keyCount; x++) {
|
||||
String volumeName = String.format("volume%s",
|
||||
RandomStringUtils.randomAlphanumeric(5));
|
||||
String bucketName = String.format("bucket%s",
|
||||
RandomStringUtils.randomAlphanumeric(5));
|
||||
String keyName = String.format("key%s",
|
||||
RandomStringUtils.randomAlphanumeric(5));
|
||||
byte[] volumeBytes =
|
||||
keyManager.getMetadataManager().getVolumeKey(volumeName);
|
||||
byte[] bucketBytes =
|
||||
keyManager.getMetadataManager().getBucketKey(volumeName, bucketName);
|
||||
// cheat here, just create a volume and bucket entry so that we can
|
||||
// create the keys, we put the same data for key and value since the
|
||||
// system does not decode the object
|
||||
keyManager.getMetadataManager().getVolumeTable().put(volumeBytes,
|
||||
volumeBytes);
|
||||
|
||||
keyManager.getMetadataManager().getBucketTable().put(bucketBytes,
|
||||
bucketBytes);
|
||||
|
||||
OmKeyArgs arg =
|
||||
new OmKeyArgs.Builder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.build();
|
||||
//Open, Commit and Delete the Keys in the Key Manager.
|
||||
OpenKeySession session = keyManager.openKey(arg);
|
||||
arg.addLocationInfo(keyManager.allocateBlock(arg, session.getId()));
|
||||
keyManager.commitKey(arg, session.getId());
|
||||
keyManager.deleteKey(arg);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user