HDFS-12283. Ozone: DeleteKey-5: Implement SCM DeletedBlockLog. Contributed by Yuanbo Liu.

This commit is contained in:
Weiwei Yang 2017-08-24 13:46:03 +08:00 committed by Owen O'Malley
parent f51189e2bb
commit 4d3b8d332d
8 changed files with 595 additions and 1 deletions

View File

@ -80,6 +80,7 @@ public final class OzoneConsts {
public static final String BLOCK_DB = "block.db";
public static final String NODEPOOL_DB = "nodepool.db";
public static final String OPEN_CONTAINERS_DB = "openContainers.db";
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String KSM_DB_NAME = "ksm.db";
/**

View File

@ -212,6 +212,9 @@ public final class ScmConfigKeys {
public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
300; // Default 5 minute wait.
public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
"ozone.scm.block.deletion.max.retry";
public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096;
/**
* Never constructed.

View File

@ -88,6 +88,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// Track all containers owned by block service.
private final MetadataStore containerStore;
private final DeletedBlockLog deletedBlockLog;
private Map<OzoneProtos.LifeCycleState,
Map<String, BlockContainerInfo>> containers;
@ -142,6 +143,7 @@ public BlockManagerImpl(final Configuration conf,
this.lock = new ReentrantLock();
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
deletedBlockLog = new DeletedBlockLogImpl(conf);
}
// TODO: close full (or almost full) containers with a separate thread.
@ -490,7 +492,9 @@ public void close() throws IOException {
if (containerStore != null) {
containerStore.close();
}
if (deletedBlockLog != null) {
deletedBlockLog.close();
}
MBeans.unregister(mxBean);
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.block;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
* The DeletedBlockLog is a persisted log in SCM to keep tracking
* container blocks which are under deletion. It maintains info
* about under-deletion container blocks that notified by KSM,
* and the state how it is processed.
*/
public interface DeletedBlockLog extends Closeable {
/**
* A limit size list of transactions. Note count is the max number
* of TXs to return, we might not be able to always return this
* number. and the processCount of those transactions
* should be [0, MAX_RETRY).
*
* @param count - number of transactions.
* @return a list of BlockDeletionTransaction.
*/
List<DeletedBlocksTransaction> getTransactions(int count)
throws IOException;
/**
* Increments count for given list of transactions by 1.
* The log maintains a valid range of counts for each transaction
* [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate
* the transaction is no longer valid.
*
* @param txIDs - transaction ID.
*/
void incrementCount(List<Long> txIDs)
throws IOException;
/**
* Commits a transaction means to delete all footprints of a transaction
* from the log. This method doesn't guarantee all transactions can be
* successfully deleted, it tolerate failures and tries best efforts to.
*
* @param txIDs - transaction IDs.
*/
void commitTransactions(List<Long> txIDs) throws IOException;
/**
* Creates a block deletion transaction and adds that into the log.
*
* @param containerName - container name.
* @param blocks - blocks that belong to the same container.
*
* @throws IOException
*/
void addTransaction(String containerName, List<String> blocks)
throws IOException;
}

View File

@ -0,0 +1,246 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.block;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
/**
* A implement class of {@link DeletedBlockLog}, and it uses
* K/V db to maintain block deletion transactions between scm and datanode.
* This is a very basic implementation, it simply scans the log and
* memorize the position that scanned by last time, and uses this to
* determine where the next scan starts. It has no notion about weight
* of each transaction so as long as transaction is still valid, they get
* equally same chance to be retrieved which only depends on the nature
* order of the transaction ID.
*/
public class DeletedBlockLogImpl implements DeletedBlockLog {
private static final Logger LOG =
LoggerFactory.getLogger(DeletedBlockLogImpl.class);
private static final byte[] LATEST_TXID =
DFSUtil.string2Bytes("#LATEST_TXID#");
private final int maxRetry;
private final MetadataStore deletedStore;
private final Lock lock;
// The latest id of deleted blocks in the db.
private long lastTxID;
private long lastReadTxID;
public DeletedBlockLogImpl(Configuration conf) throws IOException {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
File metaDir = OzoneUtils.getScmMetadirPath(conf);
String scmMetaDataDir = metaDir.getPath();
File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
// Load store of all transactions.
deletedStore = MetadataStoreBuilder.newBuilder()
.setCreateIfMissing(true)
.setConf(conf)
.setDbFile(deletedLogDbPath)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
this.lock = new ReentrantLock();
// start from the head of deleted store.
lastReadTxID = 0;
lastTxID = findLatestTxIDInStore();
}
@VisibleForTesting
MetadataStore getDeletedStore() {
return deletedStore;
}
/**
* There is no need to lock before reading because
* it's only used in construct method.
*
* @return latest txid.
* @throws IOException
*/
private long findLatestTxIDInStore() throws IOException {
long txid = 0;
byte[] value = deletedStore.get(LATEST_TXID);
if (value != null) {
txid = Longs.fromByteArray(value);
}
return txid;
}
@Override
public List<DeletedBlocksTransaction> getTransactions(
int count) throws IOException {
List<DeletedBlocksTransaction> result = new ArrayList<>();
MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey)
-> Longs.fromByteArray(currentKey) > lastReadTxID;
MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey)
-> !Arrays.equals(LATEST_TXID, currentKey);
lock.lock();
try {
deletedStore.iterate(null, (key, value) -> {
if (getNextTxID.filterKey(null, key, null) &&
avoidInvalidTxid.filterKey(null, key, null)) {
DeletedBlocksTransaction block = DeletedBlocksTransaction
.parseFrom(value);
if (block.getCount() > -1 && block.getCount() <= maxRetry) {
result.add(block);
}
}
return result.size() < count;
});
// Scan the metadata from the beginning.
if (result.size() < count || result.size() < 1) {
lastReadTxID = 0;
} else {
lastReadTxID = result.get(result.size() - 1).getTxID();
}
} finally {
lock.unlock();
}
return result;
}
/**
* {@inheritDoc}
*
* @param txIDs - transaction ID.
* @throws IOException
*/
@Override
public void incrementCount(List<Long> txIDs) throws IOException {
BatchOperation batch = new BatchOperation();
lock.lock();
try {
for(Long txID : txIDs) {
try {
DeletedBlocksTransaction block = DeletedBlocksTransaction
.parseFrom(deletedStore.get(Longs.toByteArray(txID)));
DeletedBlocksTransaction.Builder builder = block.toBuilder();
if (block.getCount() > -1) {
builder.setCount(block.getCount() + 1);
}
// if the retry time exceeds the maxRetry value
// then set the retry value to -1, stop retrying, admins can
// analyze those blocks and purge them manually by SCMCli.
if (block.getCount() > maxRetry) {
builder.setCount(-1);
}
deletedStore.put(Longs.toByteArray(txID),
builder.build().toByteArray());
} catch (IOException ex) {
LOG.warn("Cannot increase count for txID " + txID, ex);
}
}
deletedStore.writeBatch(batch);
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc}
*
* @param txIDs - transaction IDs.
* @throws IOException
*/
@Override
public void commitTransactions(List<Long> txIDs) throws IOException {
lock.lock();
try {
for (Long txID : txIDs) {
try {
deletedStore.delete(Longs.toByteArray(txID));
} catch (IOException ex) {
LOG.warn("Cannot commit txID " + txID, ex);
}
}
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc}
*
* @param containerName - container name.
* @param blocks - blocks that belong to the same container.
* @throws IOException
*/
@Override
public void addTransaction(String containerName, List<String> blocks)
throws IOException {
BatchOperation batch = new BatchOperation();
lock.lock();
try {
DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder()
.setTxID(lastTxID + 1)
.setContainerName(containerName)
.addAllBlockID(blocks)
.setCount(0)
.build();
byte[] key = Longs.toByteArray(lastTxID + 1);
batch.put(key, tx.toByteArray());
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
deletedStore.writeBatch(batch);
lastTxID += 1;
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
if (deletedStore != null) {
deletedStore.close();
}
}
}

View File

@ -99,6 +99,15 @@ message ContainerInfo {
optional int64 keycount = 4;
}
// The deleted blocks which are stored in deletedBlock.db of scm.
message DeletedBlocksTransaction {
required int64 txID = 1;
required string containerName = 2;
repeated string blockID = 3;
// the retry time of sending deleting command to datanode.
required int32 count = 4;
}
/**
A set of container reports, max count is generally set to
8192 since that keeps the size of the reports under 1 MB.

View File

@ -264,6 +264,19 @@
</description>
</property>
<property>
<name>ozone.scm.block.deletion.max.retry</name>
<value>4096</value>
<description>
SCM wraps up a number of blocks in a deletion transaction and send that
to datanode for physically deletion periodically. This property
determines how many times at most for SCM to retry sending a deletion
transaction to datanode. The default value 4096 is relatively big so
that SCM could try enough times before giving up, as the actual deletion
is async so time required is unpredictable.
</description>
</property>
<property>
<name>ozone.scm.heartbeat.log.warn.interval.count</name>
<value>10</value>

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.block;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
/**
* Tests for DeletedBlockLog.
*/
public class TestDeletedBlockLog {
private static DeletedBlockLogImpl deletedBlockLog;
private OzoneConfiguration conf;
private File testDir;
@Before
public void setup() throws Exception {
testDir = GenericTestUtils.getTestDir(
TestDeletedBlockLog.class.getSimpleName());
conf = new OzoneConfiguration();
conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
conf.set(OZONE_CONTAINER_METADATA_DIRS, testDir.getAbsolutePath());
deletedBlockLog = new DeletedBlockLogImpl(conf);
}
@After
public void tearDown() throws Exception {
deletedBlockLog.close();
FileUtils.deleteDirectory(testDir);
}
private Map<String, List<String>> generateData(int dataSize) {
Map<String, List<String>> blockMap = new HashMap<>();
Random random = new Random(1);
for (int i = 0; i < dataSize; i++) {
String containerName = "container-" + UUID.randomUUID().toString();
List<String> blocks = new ArrayList<>();
int blockSize = random.nextInt(30) + 1;
for (int j = 0; j < blockSize; j++) {
blocks.add("block-" + UUID.randomUUID().toString());
}
blockMap.put(containerName, blocks);
}
return blockMap;
}
@Test
public void testGetTransactions() throws Exception {
List<DeletedBlocksTransaction> blocks =
deletedBlockLog.getTransactions(30);
Assert.assertEquals(0, blocks.size());
// Creates 40 TX in the log.
for (Map.Entry<String, List<String>> entry : generateData(40).entrySet()){
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
}
// Get first 30 TXs.
blocks = deletedBlockLog.getTransactions(30);
Assert.assertEquals(30, blocks.size());
for (int i = 0; i < 30; i++) {
Assert.assertEquals(i + 1, blocks.get(i).getTxID());
}
// Get another 30 TXs.
// The log only 10 left, so this time it will only return 10 TXs.
blocks = deletedBlockLog.getTransactions(30);
Assert.assertEquals(10, blocks.size());
for (int i = 30; i < 40; i++) {
Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID());
}
// Get another 50 TXs.
// By now the position should have moved to the beginning,
// this call will return all 40 TXs.
blocks = deletedBlockLog.getTransactions(50);
Assert.assertEquals(40, blocks.size());
for (int i = 0; i < 40; i++) {
Assert.assertEquals(i + 1, blocks.get(i).getTxID());
}
List<Long> txIDs = new ArrayList<>();
for (DeletedBlocksTransaction block : blocks) {
txIDs.add(block.getTxID());
}
deletedBlockLog.commitTransactions(txIDs);
}
@Test
public void testIncrementCount() throws Exception {
int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
// Create 30 TXs in the log.
for (Map.Entry<String, List<String>> entry : generateData(30).entrySet()){
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
}
// This will return all TXs, total num 30.
List<DeletedBlocksTransaction> blocks =
deletedBlockLog.getTransactions(40);
List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList());
for (int i = 0; i < maxRetry; i++) {
deletedBlockLog.incrementCount(txIDs);
}
// Increment another time so it exceed the maxRetry.
// On this call, count will be set to -1 which means TX eventually fails.
deletedBlockLog.incrementCount(txIDs);
blocks = deletedBlockLog.getTransactions(40);
for (DeletedBlocksTransaction block : blocks) {
Assert.assertEquals(-1, block.getCount());
}
// If all TXs are failed, getTransactions call will always return nothing.
blocks = deletedBlockLog.getTransactions(40);
Assert.assertEquals(blocks.size(), 0);
}
@Test
public void testCommitTransactions() throws Exception {
for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
}
List<DeletedBlocksTransaction> blocks =
deletedBlockLog.getTransactions(20);
List<Long> txIDs = new ArrayList<>();
for (DeletedBlocksTransaction block : blocks) {
txIDs.add(block.getTxID());
}
// Add an invalid txID.
txIDs.add(70L);
deletedBlockLog.commitTransactions(txIDs);
blocks = deletedBlockLog.getTransactions(50);
Assert.assertEquals(30, blocks.size());
}
@Test
public void testRandomOperateTransactions() throws Exception {
Random random = new Random();
int added = 0, committed = 0;
List<DeletedBlocksTransaction> blocks = new ArrayList<>();
List<Long> txIDs = new ArrayList<>();
byte[] latestTxid = DFSUtil.string2Bytes("#LATEST_TXID#");
MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid =
(preKey, currentKey, nextKey) ->
!Arrays.equals(latestTxid, currentKey);
MetadataStore store = deletedBlockLog.getDeletedStore();
// Randomly add/get/commit/increase transactions.
for (int i = 0; i < 100; i++) {
int state = random.nextInt(4);
if (state == 0) {
for (Map.Entry<String, List<String>> entry :
generateData(10).entrySet()){
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
}
added += 10;
} else if (state == 1) {
blocks = deletedBlockLog.getTransactions(20);
txIDs = new ArrayList<>();
for (DeletedBlocksTransaction block : blocks) {
txIDs.add(block.getTxID());
}
deletedBlockLog.incrementCount(txIDs);
} else if (state == 2) {
txIDs = new ArrayList<>();
for (DeletedBlocksTransaction block : blocks) {
txIDs.add(block.getTxID());
}
blocks = new ArrayList<>();
committed += txIDs.size();
deletedBlockLog.commitTransactions(txIDs);
} else {
// verify the number of added and committed.
List<Map.Entry<byte[], byte[]>> result =
store.getRangeKVs(null, added, avoidLatestTxid);
Assert.assertEquals(added, result.size() + committed);
}
}
}
@Test
public void testPersistence() throws Exception {
for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){
deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
}
// close db and reopen it again to make sure
// transactions are stored persistently.
deletedBlockLog.close();
deletedBlockLog = new DeletedBlockLogImpl(conf);
List<DeletedBlocksTransaction> blocks =
deletedBlockLog.getTransactions(10);
List<Long> txIDs = new ArrayList<>();
for (DeletedBlocksTransaction block : blocks) {
txIDs.add(block.getTxID());
}
deletedBlockLog.commitTransactions(txIDs);
blocks = deletedBlockLog.getTransactions(10);
Assert.assertEquals(10, blocks.size());
}
}