HDDS-160:Refactor KeyManager, ChunkManager. Contributed by Bharat Viswanadham

This commit is contained in:
Bharat Viswanadham 2018-06-15 14:35:33 -07:00
parent 998e2850a3
commit ca192cb7c9
9 changed files with 1331 additions and 3 deletions

View File

@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutionException;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.NO_SUCH_ALGORITHM;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
/**
* This class is for performing chunk related operations.
*/
public class ChunkManagerImpl implements ChunkManager {
static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
/**
* writes a given chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block
* @param info - ChunkInfo
* @param data - data of the chunk
* @param stage - Stage of the Chunk operation
* @throws StorageContainerException
*/
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
byte[] data, ContainerProtos.Stage stage)
throws StorageContainerException {
try {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
File chunkFile = ChunkUtils.validateChunk(containerData, info);
File tmpChunkFile = getTmpChunkFile(chunkFile, info);
LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
info.getChunkName(), stage, chunkFile, tmpChunkFile);
switch (stage) {
case WRITE_DATA:
// Initially writes to temporary chunk file.
ChunkUtils.writeData(tmpChunkFile, info, data);
break;
case COMMIT_DATA:
// commit the data, means move chunk data from temporary chunk file
// to actual chunk file.
long sizeDiff = tmpChunkFile.length() - chunkFile.length();
commitChunk(tmpChunkFile, chunkFile);
containerData.incrBytesUsed(sizeDiff);
containerData.incrWriteCount();
containerData.incrWriteBytes(sizeDiff);
break;
case COMBINED:
// directly write to the chunk file
ChunkUtils.writeData(chunkFile, info, data);
containerData.incrBytesUsed(info.getLen());
containerData.incrWriteCount();
containerData.incrWriteBytes(info.getLen());
break;
default:
throw new IOException("Can not identify write operation.");
}
} catch (StorageContainerException ex) {
throw ex;
} catch (NoSuchAlgorithmException ex) {
LOG.error("write data failed. error: {}", ex);
throw new StorageContainerException("Internal error: ", ex,
NO_SUCH_ALGORITHM);
} catch (ExecutionException | IOException ex) {
LOG.error("write data failed. error: {}", ex);
throw new StorageContainerException("Internal error: ", ex,
CONTAINER_INTERNAL_ERROR);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("write data failed. error: {}", e);
throw new StorageContainerException("Internal error: ", e,
CONTAINER_INTERNAL_ERROR);
}
}
/**
* reads the data defined by a chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block.
* @param info - ChunkInfo.
* @return byte array
* @throws StorageContainerException
* TODO: Right now we do not support partial reads and writes of chunks.
* TODO: Explore if we need to do that for ozone.
*/
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info)
throws StorageContainerException {
try {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
ByteBuffer data;
// Checking here, which layout version the container is, and reading
// the chunk file in that format.
// In version1, we verify checksum if it is available and return data
// of the chunk file.
if (containerData.getLayOutVersion() == ChunkLayOutVersion
.getLatestVersion().getVersion()) {
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
data = ChunkUtils.readData(chunkFile, info);
containerData.incrReadCount();
containerData.incrReadBytes(chunkFile.length());
return data.array();
}
} catch(NoSuchAlgorithmException ex) {
LOG.error("read data failed. error: {}", ex);
throw new StorageContainerException("Internal error: ",
ex, NO_SUCH_ALGORITHM);
} catch (ExecutionException ex) {
LOG.error("read data failed. error: {}", ex);
throw new StorageContainerException("Internal error: ",
ex, CONTAINER_INTERNAL_ERROR);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("read data failed. error: {}", e);
throw new StorageContainerException("Internal error: ",
e, CONTAINER_INTERNAL_ERROR);
}
return null;
}
/**
* Deletes a given chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block
* @param info - Chunk Info
* @throws StorageContainerException
*/
public void deleteChunk(Container container, BlockID blockID, ChunkInfo info)
throws StorageContainerException {
Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
// Checking here, which layout version the container is, and performing
// deleting chunk operation.
// In version1, we have only chunk file.
if (containerData.getLayOutVersion() == ChunkLayOutVersion
.getLatestVersion().getVersion()) {
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
FileUtil.fullyDelete(chunkFile);
containerData.decrBytesUsed(chunkFile.length());
} else {
LOG.error("Not Supported Operation. Trying to delete a " +
"chunk that is in shared file. chunk info : " + info.toString());
throw new StorageContainerException("Not Supported Operation. " +
"Trying to delete a chunk that is in shared file. chunk info : "
+ info.toString(), UNSUPPORTED_REQUEST);
}
}
}
/**
* Shutdown the chunkManager.
*
* In the chunkManager we haven't acquired any resources, so nothing to do
* here.
*/
public void shutdown() {
//TODO: need to revisit this during integration of container IO.
}
/**
* Returns the temporary chunkFile path.
* @param chunkFile
* @param info
* @return temporary chunkFile path
* @throws StorageContainerException
*/
private File getTmpChunkFile(File chunkFile, ChunkInfo info)
throws StorageContainerException {
return new File(chunkFile.getParent(),
chunkFile.getName() +
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
}
/**
* Commit the chunk by renaming the temporary chunk file to chunk file.
* @param tmpChunkFile
* @param chunkFile
* @throws IOException
*/
private void commitChunk(File tmpChunkFile, File chunkFile) throws
IOException {
Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
}
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_KEY;
/**
* This class is for performing key related operations on the KeyValue
* Container.
*/
public class KeyManagerImpl implements KeyManager {
static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class);
private Configuration config;
/**
* Constructs a key Manager.
*
* @param conf - Ozone configuration
*/
public KeyManagerImpl(Configuration conf) {
Preconditions.checkNotNull(conf, "Config cannot be null");
this.config = conf;
}
/**
* Puts or overwrites a key.
*
* @param container - Container for which key need to be added.
* @param data - Key Data.
* @throws IOException
*/
public void putKey(Container container, KeyData data) throws IOException {
Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
"operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
"cannot be negative");
// We are not locking the key manager since LevelDb serializes all actions
// against a single DB. We rely on DB level locking to avoid conflicts.
MetadataStore db = KeyUtils.getDB((KeyValueContainerData) container
.getContainerData(), config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage()
.toByteArray());
}
/**
* Gets an existing key.
*
* @param container - Container from which key need to be get.
* @param data - Key Data.
* @return Key Data.
* @throws IOException
*/
public KeyData getKey(Container container, KeyData data) throws IOException {
Preconditions.checkNotNull(data, "Key data cannot be null");
Preconditions.checkNotNull(data.getContainerID(), "Container name cannot" +
" be null");
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = KeyUtils.getDB(containerData, config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the key.",
NO_SUCH_KEY);
}
ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
return KeyData.getFromProtoBuf(keyData);
}
/**
* Deletes an existing Key.
*
* @param container - Container from which key need to be deleted.
* @param blockID - ID of the block.
* @throws StorageContainerException
*/
public void deleteKey(Container container, BlockID blockID) throws
IOException {
Preconditions.checkNotNull(blockID, "block ID cannot be null.");
Preconditions.checkState(blockID.getContainerID() >= 0,
"Container ID cannot be negative.");
Preconditions.checkState(blockID.getLocalID() >= 0,
"Local ID cannot be negative.");
KeyValueContainerData cData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = KeyUtils.getDB(cData, config);
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
// Note : There is a race condition here, since get and delete
// are not atomic. Leaving it here since the impact is refusing
// to delete a key which might have just gotten inserted after
// the get check.
byte[] kKey = Longs.toByteArray(blockID.getLocalID());
byte[] kData = db.get(kKey);
if (kData == null) {
throw new StorageContainerException("Unable to find the key.",
NO_SUCH_KEY);
}
db.delete(kKey);
}
/**
* List keys in a container.
*
* @param container - Container from which keys need to be listed.
* @param startLocalID - Key to start from, 0 to begin.
* @param count - Number of keys to return.
* @return List of Keys that match the criteria.
*/
public List<KeyData> listKey(Container container, long startLocalID, int
count) throws IOException {
Preconditions.checkNotNull(container, "container cannot be null");
Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
"negative");
Preconditions.checkArgument(count > 0,
"Count must be a positive number.");
container.readLock();
List<KeyData> result = null;
KeyValueContainerData cData = (KeyValueContainerData) container
.getContainerData();
MetadataStore db = KeyUtils.getDB(cData, config);
result = new ArrayList<>();
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
List<Map.Entry<byte[], byte[]>> range = db.getSequentialRangeKVs(
startKeyInBytes, count, null);
for (Map.Entry<byte[], byte[]> entry : range) {
KeyData value = KeyUtils.getKeyData(entry.getValue());
KeyData data = new KeyData(value.getBlockID());
result.add(data);
}
return result;
}
/**
* Shutdown KeyValueContainerManager.
*/
public void shutdown() {
KeyUtils.shutdownCache(ContainerCache.getInstance(config));
}
}

View File

@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue.helpers;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutionException;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
/**
* Utility methods for chunk operations for KeyValue container.
*/
public final class ChunkUtils {
/** Never constructed. **/
private ChunkUtils() {
}
/**
* Writes the data in chunk Info to the specified location in the chunkfile.
*
* @param chunkFile - File to write data to.
* @param chunkInfo - Data stream to write.
* @param data - The data buffer.
* @throws StorageContainerException
*/
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
byte[] data) throws
StorageContainerException, ExecutionException, InterruptedException,
NoSuchAlgorithmException {
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
if (data.length != chunkInfo.getLen()) {
String err = String.format("data array does not match the length " +
"specified. DataLen: %d Byte Array: %d",
chunkInfo.getLen(), data.length);
log.error(err);
throw new StorageContainerException(err, INVALID_WRITE_SIZE);
}
AsynchronousFileChannel file = null;
FileLock lock = null;
try {
file =
AsynchronousFileChannel.open(chunkFile.toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.SPARSE,
StandardOpenOption.SYNC);
lock = file.lock().get();
if (chunkInfo.getChecksum() != null &&
!chunkInfo.getChecksum().isEmpty()) {
verifyChecksum(chunkInfo, data, log);
}
int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();
if (size != data.length) {
log.error("Invalid write size found. Size:{} Expected: {} ", size,
data.length);
throw new StorageContainerException("Invalid write size found. " +
"Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
}
} catch (StorageContainerException ex) {
throw ex;
} catch(IOException e) {
throw new StorageContainerException(e, IO_EXCEPTION);
} finally {
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
log.error("Unable to release lock ??, Fatal Error.");
throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
}
}
if (file != null) {
try {
file.close();
} catch (IOException e) {
throw new StorageContainerException("Error closing chunk file",
e, CONTAINER_INTERNAL_ERROR);
}
}
}
}
/**
* Reads data from an existing chunk file.
*
* @param chunkFile - file where data lives.
* @param data - chunk definition.
* @return ByteBuffer
* @throws StorageContainerException
* @throws ExecutionException
* @throws InterruptedException
*/
public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
StorageContainerException, ExecutionException, InterruptedException,
NoSuchAlgorithmException {
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
if (!chunkFile.exists()) {
log.error("Unable to find the chunk file. chunk info : {}",
data.toString());
throw new StorageContainerException("Unable to find the chunk file. " +
"chunk info " +
data.toString(), UNABLE_TO_FIND_CHUNK);
}
AsynchronousFileChannel file = null;
FileLock lock = null;
try {
file =
AsynchronousFileChannel.open(chunkFile.toPath(),
StandardOpenOption.READ);
lock = file.lock(data.getOffset(), data.getLen(), true).get();
ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
file.read(buf, data.getOffset()).get();
if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
verifyChecksum(data, buf.array(), log);
}
return buf;
} catch (IOException e) {
throw new StorageContainerException(e, IO_EXCEPTION);
} finally {
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
log.error("I/O error is lock release.");
}
}
if (file != null) {
IOUtils.closeStream(file);
}
}
}
/**
* Verifies the checksum of a chunk against the data buffer.
*
* @param chunkInfo - Chunk Info.
* @param data - data buffer
* @param log - log
* @throws NoSuchAlgorithmException
* @throws StorageContainerException
*/
private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
log) throws NoSuchAlgorithmException, StorageContainerException {
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
sha.update(data);
if (!Hex.encodeHexString(sha.digest()).equals(
chunkInfo.getChecksum())) {
log.error("Checksum mismatch. Provided: {} , computed: {}",
chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
throw new StorageContainerException("Checksum mismatch. Provided: " +
chunkInfo.getChecksum() + " , computed: " +
DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
}
}
/**
* Validates chunk data and returns a file object to Chunk File that we are
* expected to write data to.
*
* @param data - container data.
* @param info - chunk info.
* @return File
* @throws StorageContainerException
*/
public static File validateChunk(KeyValueContainerData data, ChunkInfo info)
throws StorageContainerException {
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
File chunkFile = getChunkFile(data, info);
if (isOverWriteRequested(chunkFile, info)) {
if (!isOverWritePermitted(info)) {
log.error("Rejecting write chunk request. Chunk overwrite " +
"without explicit request. {}", info.toString());
throw new StorageContainerException("Rejecting write chunk request. " +
"OverWrite flag required." + info.toString(),
OVERWRITE_FLAG_REQUIRED);
}
}
return chunkFile;
}
/**
* Validates that Path to chunk file exists.
*
* @param containerData - Container Data
* @param info - Chunk info
* @return - File.
* @throws StorageContainerException
*/
public static File getChunkFile(KeyValueContainerData containerData,
ChunkInfo info) throws
StorageContainerException {
Preconditions.checkNotNull(containerData, "Container data can't be null");
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
String chunksPath = containerData.getChunksPath();
if (chunksPath == null) {
log.error("Chunks path is null in the container data");
throw new StorageContainerException("Unable to get Chunks directory.",
UNABLE_TO_FIND_DATA_DIR);
}
File chunksLoc = new File(chunksPath);
if (!chunksLoc.exists()) {
log.error("Chunks path does not exist");
throw new StorageContainerException("Unable to get Chunks directory.",
UNABLE_TO_FIND_DATA_DIR);
}
return chunksLoc.toPath().resolve(info.getChunkName()).toFile();
}
/**
* Checks if we are getting a request to overwrite an existing range of
* chunk.
*
* @param chunkFile - File
* @param chunkInfo - Buffer to write
* @return bool
*/
public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
chunkInfo) {
if (!chunkFile.exists()) {
return false;
}
long offset = chunkInfo.getOffset();
return offset < chunkFile.length();
}
/**
* Overwrite is permitted if an only if the user explicitly asks for it. We
* permit this iff the key/value pair contains a flag called
* [OverWriteRequested, true].
*
* @param chunkInfo - Chunk info
* @return true if the user asks for it.
*/
public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
return (overWrite != null) &&
(!overWrite.isEmpty()) &&
(Boolean.valueOf(overWrite));
}
}

View File

@ -20,14 +20,19 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache; import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStore;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_READ_METADATA_DB; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.NO_SUCH_KEY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.UNABLE_TO_READ_METADATA_DB;
/** /**
* Utils functions to help key functions. * Utils functions to help key functions.
@ -79,4 +84,32 @@ public final class KeyUtils {
cache.removeDB(container.getContainerId()); cache.removeDB(container.getContainerId());
} }
/**
* Shutdown all DB Handles.
*
* @param cache - Cache for DB Handles.
*/
@SuppressWarnings("unchecked")
public static void shutdownCache(ContainerCache cache) {
cache.shutdownCache();
}
/**
* Parses the {@link KeyData} from a bytes array.
*
* @param bytes key data in bytes.
* @return key data.
* @throws IOException if the bytes array is malformed or invalid.
*/
public static KeyData getKeyData(byte[] bytes) throws IOException {
try {
ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(
bytes);
KeyData data = KeyData.getFromProtoBuf(keyData);
return data;
} catch (IOException e) {
throw new StorageContainerException("Failed to parse key data from the" +
" bytes array.", NO_SUCH_KEY);
}
}
} }

View File

@ -0,0 +1,80 @@
package org.apache.hadoop.ozone.container.keyvalue.interfaces;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
/**
* Chunk Manager allows read, write, delete and listing of chunks in
* a container.
*/
public interface ChunkManager {
/**
* writes a given chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block.
* @param info - ChunkInfo.
* @param stage - Chunk Stage write.
* @throws StorageContainerException
*/
void writeChunk(Container container, BlockID blockID, ChunkInfo info,
byte[] data, ContainerProtos.Stage stage)
throws StorageContainerException;
/**
* reads the data defined by a chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block.
* @param info - ChunkInfo.
* @return byte array
* @throws StorageContainerException
*
* TODO: Right now we do not support partial reads and writes of chunks.
* TODO: Explore if we need to do that for ozone.
*/
byte[] readChunk(Container container, BlockID blockID, ChunkInfo info) throws
StorageContainerException;
/**
* Deletes a given chunk.
*
* @param container - Container for the chunk
* @param blockID - ID of the block.
* @param info - Chunk Info
* @throws StorageContainerException
*/
void deleteChunk(Container container, BlockID blockID, ChunkInfo info) throws
StorageContainerException;
// TODO : Support list operations.
/**
* Shutdown the chunkManager.
*/
void shutdown();
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.keyvalue.interfaces;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import java.io.IOException;
import java.util.List;
/**
* KeyManager is for performing key related operations on the container.
*/
public interface KeyManager {
/**
* Puts or overwrites a key.
*
* @param container - Container for which key need to be added.
* @param data - Key Data.
* @throws IOException
*/
void putKey(Container container, KeyData data) throws IOException;
/**
* Gets an existing key.
*
* @param container - Container from which key need to be get.
* @param data - Key Data.
* @return Key Data.
* @throws IOException
*/
KeyData getKey(Container container, KeyData data) throws IOException;
/**
* Deletes an existing Key.
*
* @param container - Container from which key need to be deleted.
* @param blockID - ID of the block.
* @throws StorageContainerException
*/
void deleteKey(Container container, BlockID blockID) throws IOException;
/**
* List keys in a container.
*
* @param container - Container from which keys need to be listed.
* @param startLocalID - Key to start from, 0 to begin.
* @param count - Number of keys to return.
* @return List of Keys that match the criteria.
*/
List<KeyData> listKey(Container container, long startLocalID, int count) throws
IOException;
/**
* Shutdown ContainerManager.
*/
void shutdown();
}

View File

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.File;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
/**
* This class is used to test ChunkManager operations.
*/
public class TestChunkManagerImpl {
private OzoneConfiguration config;
private String scmId = UUID.randomUUID().toString();
private VolumeSet volumeSet;
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
private KeyValueContainerData keyValueContainerData;
private KeyValueContainer keyValueContainer;
private KeyData keyData;
private BlockID blockID;
private ChunkManagerImpl chunkManager;
private ChunkInfo chunkInfo;
private byte[] data;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Before
public void setUp() throws Exception {
config = new OzoneConfiguration();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
.getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
.toString()).build();
volumeSet = mock(VolumeSet.class);
volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenReturn(hddsVolume);
keyValueContainerData = new KeyValueContainerData(
ContainerProtos.ContainerType.KeyValueContainer, 1L);
keyValueContainer = new KeyValueContainer(
keyValueContainerData, config);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
data = "testing write chunks".getBytes();
// Creating KeyData
blockID = new BlockID(1L, 1L);
keyData = new KeyData(blockID);
keyData.addMetadata("VOLUME", "ozone");
keyData.addMetadata("OWNER", "hdfs");
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, data.length);
chunkList.add(chunkInfo.getProtoBufMessage());
keyData.setChunks(chunkList);
// Create a ChunkManager object.
chunkManager = new ChunkManagerImpl();
}
@Test
public void testWriteChunkStageWriteAndCommit() throws Exception {
//As in Setup, we try to create container, these paths should exist.
assertTrue(keyValueContainerData.getChunksPath() != null);
File chunksPath = new File(keyValueContainerData.getChunksPath());
assertTrue(chunksPath.exists());
// Initially chunks folder should be empty.
assertTrue(chunksPath.listFiles().length == 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.WRITE_DATA);
// Now a chunk file is being written with Stage WRITE_DATA, so it should
// create a temporary chunk file.
assertTrue(chunksPath.listFiles().length == 1);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMMIT_DATA);
// Old temp file should have been renamed to chunk file.
assertTrue(chunksPath.listFiles().length == 1);
}
@Test
public void testWriteChunkIncorrectLength() throws Exception {
try {
long randomLength = 200L;
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(chunkInfo.getProtoBufMessage());
keyData.setChunks(chunkList);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.WRITE_DATA);
fail("testWriteChunkIncorrectLength failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("data array does not match " +
"the length ", ex);
assertEquals(ContainerProtos.Result.INVALID_WRITE_SIZE, ex.getResult());
}
}
@Test
public void testWriteChunkStageCombinedData() throws Exception {
//As in Setup, we try to create container, these paths should exist.
assertTrue(keyValueContainerData.getChunksPath() != null);
File chunksPath = new File(keyValueContainerData.getChunksPath());
assertTrue(chunksPath.exists());
// Initially chunks folder should be empty.
assertTrue(chunksPath.listFiles().length == 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
// Now a chunk file is being written with Stage WRITE_DATA, so it should
// create a temporary chunk file.
assertTrue(chunksPath.listFiles().length == 1);
}
@Test
public void testReadChunk() throws Exception {
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
chunkInfo);
assertEquals(expectedData.length, data.length);
assertTrue(Arrays.equals(expectedData, data));
}
@Test
public void testDeleteChunk() throws Exception {
File chunksPath = new File(keyValueContainerData.getChunksPath());
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
assertTrue(chunksPath.listFiles().length == 1);
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
assertTrue(chunksPath.listFiles().length == 0);
}
@Test
public void testDeleteChunkUnsupportedRequest() throws Exception {
try {
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
long randomLength = 200L;
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength);
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(chunkInfo.getProtoBufMessage());
keyData.setChunks(chunkList);
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
fail("testDeleteChunkUnsupportedRequest");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Not Supported Operation.", ex);
assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex.getResult());
}
}
@Test
public void testWriteChunkChecksumMismatch() throws Exception {
try {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, data.length);
//Setting checksum to some value.
chunkInfo.setChecksum("some garbage");
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
chunkList.add(chunkInfo.getProtoBufMessage());
keyData.setChunks(chunkList);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
fail("testWriteChunkChecksumMismatch failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex);
assertEquals(ContainerProtos.Result.CHECKSUM_MISMATCH, ex.getResult());
}
}
@Test
public void testReadChunkFileNotExists() throws Exception {
try {
// trying to read a chunk, where chunk file does not exist
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
chunkInfo);
fail("testReadChunkFileNotExists failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Unable to find the chunk " +
"file.", ex);
assertEquals(ContainerProtos.Result.UNABLE_TO_FIND_CHUNK, ex.getResult());
}
}
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.volume
.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
/**
* This class is used to test key related operations on the container.
*/
public class TestKeyManagerImpl {
private OzoneConfiguration config;
private String scmId = UUID.randomUUID().toString();
private VolumeSet volumeSet;
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
private KeyValueContainerData keyValueContainerData;
private KeyValueContainer keyValueContainer;
private KeyData keyData;
private KeyManagerImpl keyValueContainerManager;
private BlockID blockID;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Before
public void setUp() throws Exception {
config = new OzoneConfiguration();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
.getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
.toString()).build();
volumeSet = mock(VolumeSet.class);
volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenReturn(hddsVolume);
keyValueContainerData = new KeyValueContainerData(
ContainerProtos.ContainerType.KeyValueContainer, 1L);
keyValueContainer = new KeyValueContainer(
keyValueContainerData, config);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
// Creating KeyData
blockID = new BlockID(1L, 1L);
keyData = new KeyData(blockID);
keyData.addMetadata("VOLUME", "ozone");
keyData.addMetadata("OWNER", "hdfs");
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, 1024);
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
// Create KeyValueContainerManager
keyValueContainerManager = new KeyManagerImpl(config);
}
@Test
public void testPutAndGetKey() throws Exception {
//Put Key
keyValueContainerManager.putKey(keyValueContainer, keyData);
//Get Key
KeyData fromGetKeyData = keyValueContainerManager.getKey(keyValueContainer,
keyData);
assertEquals(keyData.getContainerID(), fromGetKeyData.getContainerID());
assertEquals(keyData.getLocalID(), fromGetKeyData.getLocalID());
assertEquals(keyData.getChunks().size(), fromGetKeyData.getChunks().size());
assertEquals(keyData.getMetadata().size(), fromGetKeyData.getMetadata()
.size());
}
@Test
public void testDeleteKey() throws Exception {
try {
//Put Key
keyValueContainerManager.putKey(keyValueContainer, keyData);
//Delete Key
keyValueContainerManager.deleteKey(keyValueContainer, blockID);
} catch (IOException ex) {
fail("testDeleteKey failed");
}
}
@Test
public void testListKey() throws Exception {
try {
keyValueContainerManager.putKey(keyValueContainer, keyData);
List<KeyData> listKeyData = keyValueContainerManager.listKey(
keyValueContainer, 1, 10);
assertNotNull(listKeyData);
assertTrue(listKeyData.size() == 1);
for (long i = 2; i <= 10; i++) {
blockID = new BlockID(1L, i);
keyData = new KeyData(blockID);
keyData.addMetadata("VOLUME", "ozone");
keyData.addMetadata("OWNER", "hdfs");
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, 1024);
chunkList.add(info.getProtoBufMessage());
keyData.setChunks(chunkList);
keyValueContainerManager.putKey(keyValueContainer, keyData);
}
listKeyData = keyValueContainerManager.listKey(
keyValueContainer, 1, 10);
assertNotNull(listKeyData);
assertTrue(listKeyData.size() == 10);
} catch (IOException ex) {
fail("testListKey failed");
}
}
@Test
public void testGetNoSuchKey() throws Exception {
try {
keyData = new KeyData(new BlockID(1L, 2L));
keyValueContainerManager.getKey(keyValueContainer, keyData);
fail("testGetNoSuchKey failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Unable to find the key.", ex);
assertEquals(ContainerProtos.Result.NO_SUCH_KEY, ex.getResult());
}
}
}