From 52d1d9603ecc03dbe3ef5fafa60377ef461ecca3 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Thu, 28 Jun 2018 14:07:52 -0700 Subject: [PATCH] HDDS-183:Integrate Volumeset, ContainerSet and HddsDispatcher. Contributed by Bharat Viswanadham --- .../proto/DatanodeContainerProtocol.proto | 1 + .../container/common/impl/ContainerData.java | 1 + .../impl/ContainerDataYaml.java} | 96 +++-- .../container/common/impl/Dispatcher.java | 11 + .../container/common/impl/HddsDispatcher.java | 28 +- .../interfaces/ContainerDispatcher.java | 14 + .../container/common/interfaces/Handler.java | 15 +- .../statemachine/DatanodeStateMachine.java | 4 +- .../background/BlockDeletingService.java | 1 + .../DeleteBlocksCommandHandler.java | 56 ++- .../states/datanode/RunningDatanodeState.java | 3 +- .../states/endpoint/VersionEndpointTask.java | 34 +- .../container/common/volume/HddsVolume.java | 6 + .../container/common/volume/VolumeSet.java | 48 +++ .../container/keyvalue/KeyValueContainer.java | 52 +-- .../keyvalue/KeyValueContainerData.java | 22 +- .../container/keyvalue/KeyValueHandler.java | 22 +- .../helpers/KeyValueContainerUtil.java | 134 ++++++ .../container/ozoneimpl/ContainerReader.java | 157 +++++++ .../container/ozoneimpl/OzoneContainer.java | 398 ++++++++---------- .../ozone/protocol/VersionResponse.java | 4 + .../ozone/container/common/SCMTestUtils.java | 13 +- .../ozone/container/common/ScmTestMock.java | 4 + .../common/TestKeyValueContainerData.java | 3 +- ...ueYaml.java => TestContainerDataYaml.java} | 25 +- .../common/impl/TestContainerSet.java | 6 +- .../common/interfaces/TestHandler.java | 8 +- .../keyvalue/TestChunkManagerImpl.java | 3 +- .../keyvalue/TestKeyManagerImpl.java | 3 +- .../keyvalue/TestKeyValueContainer.java | 10 +- .../keyvalue/TestKeyValueHandler.java | 8 +- .../ozoneimpl/TestOzoneContainer.java | 108 +++++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 4 + .../ozone/container/common/TestEndPoint.java | 20 +- .../TestStorageContainerManagerHelper.java | 11 +- .../TestCloseContainerByPipeline.java | 6 +- .../TestCloseContainerHandler.java | 6 +- .../ozoneimpl/TestOzoneContainer.java | 13 +- .../container/server/TestContainerServer.java | 10 + .../ksm/TestContainerReportWithKeys.java | 14 +- .../hadoop/ozone/web/client/TestKeys.java | 22 +- 41 files changed, 955 insertions(+), 449 deletions(-) rename hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/{keyvalue/KeyValueYaml.java => common/impl/ContainerDataYaml.java} (75%) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java rename hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/{TestKeyValueYaml.java => TestContainerDataYaml.java} (88%) create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index d29e479ab9..ff1582e8c8 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -137,6 +137,7 @@ enum Result { CONTAINER_METADATA_ERROR = 31; CONTAINER_FILES_CREATE_ERROR = 32; CONTAINER_CHECKSUM_ERROR = 33; + UNKNOWN_CONTAINER_TYPE = 34; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index 0bd7795a26..b11b66c183 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.container.common.impl; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. ContainerType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java similarity index 75% rename from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java rename to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java index 64f7152b0c..6b8e6ee4c5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java @@ -16,11 +16,12 @@ * limitations under the License. */ -package org.apache.hadoop.ozone.container.keyvalue; +package org.apache.hadoop.ozone.container.common.impl; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.yaml.snakeyaml.Yaml; import java.beans.IntrospectionException; @@ -46,13 +47,16 @@ import org.yaml.snakeyaml.nodes.Tag; import org.yaml.snakeyaml.representer.Representer; +import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.YAML_FIELDS; +import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.YAML_TAG; + /** * Class for creating and reading .container files. */ -public final class KeyValueYaml { +public final class ContainerDataYaml { - private KeyValueYaml() { + private ContainerDataYaml() { } /** @@ -62,29 +66,39 @@ private KeyValueYaml() { * @param containerData * @throws IOException */ - public static void createContainerFile(File containerFile, ContainerData - containerData) throws IOException { + public static void createContainerFile(ContainerProtos.ContainerType + containerType, File containerFile, + ContainerData containerData) throws + IOException { Preconditions.checkNotNull(containerFile, "yamlFile cannot be null"); Preconditions.checkNotNull(containerData, "containerData cannot be null"); + Preconditions.checkNotNull(containerType, "containerType cannot be null"); PropertyUtils propertyUtils = new PropertyUtils(); propertyUtils.setBeanAccess(BeanAccess.FIELD); propertyUtils.setAllowReadOnlyProperties(true); - Representer representer = new KeyValueContainerDataRepresenter(); - representer.setPropertyUtils(propertyUtils); - representer.addClassTag( - KeyValueContainerData.class, new Tag("KeyValueContainerData")); + switch(containerType) { + case KeyValueContainer: + Representer representer = new ContainerDataRepresenter(); + representer.setPropertyUtils(propertyUtils); + representer.addClassTag(KeyValueContainerData.class, + KeyValueContainerData.YAML_TAG); - Constructor keyValueDataConstructor = new KeyValueDataConstructor(); + Constructor keyValueDataConstructor = new ContainerDataConstructor(); - Yaml yaml = new Yaml(keyValueDataConstructor, representer); - - Writer writer = new OutputStreamWriter(new FileOutputStream(containerFile), - "UTF-8"); - yaml.dump(containerData, writer); - writer.close(); + Yaml yaml = new Yaml(keyValueDataConstructor, representer); + Writer writer = new OutputStreamWriter(new FileOutputStream( + containerFile), "UTF-8"); + yaml.dump(containerData, writer); + writer.close(); + break; + default: + throw new StorageContainerException("Unrecognized container Type " + + "format " + containerType, ContainerProtos.Result + .UNKNOWN_CONTAINER_TYPE); + } } /** @@ -93,57 +107,53 @@ public static void createContainerFile(File containerFile, ContainerData * @param containerFile * @throws IOException */ - public static KeyValueContainerData readContainerFile(File containerFile) + public static ContainerData readContainerFile(File containerFile) throws IOException { Preconditions.checkNotNull(containerFile, "containerFile cannot be null"); InputStream input = null; - KeyValueContainerData keyValueContainerData; + ContainerData containerData; try { PropertyUtils propertyUtils = new PropertyUtils(); propertyUtils.setBeanAccess(BeanAccess.FIELD); propertyUtils.setAllowReadOnlyProperties(true); - Representer representer = new KeyValueContainerDataRepresenter(); + Representer representer = new ContainerDataRepresenter(); representer.setPropertyUtils(propertyUtils); - representer.addClassTag( - KeyValueContainerData.class, new Tag("KeyValueContainerData")); - Constructor keyValueDataConstructor = new KeyValueDataConstructor(); + Constructor containerDataConstructor = new ContainerDataConstructor(); - Yaml yaml = new Yaml(keyValueDataConstructor, representer); + Yaml yaml = new Yaml(containerDataConstructor, representer); yaml.setBeanAccess(BeanAccess.FIELD); input = new FileInputStream(containerFile); - keyValueContainerData = (KeyValueContainerData) + containerData = (ContainerData) yaml.load(input); } finally { if (input!= null) { input.close(); } } - return keyValueContainerData; + return containerData; } /** * Representer class to define which fields need to be stored in yaml file. */ - private static class KeyValueContainerDataRepresenter extends Representer { + private static class ContainerDataRepresenter extends Representer { @Override protected Set getProperties(Class type) throws IntrospectionException { Set set = super.getProperties(type); Set filtered = new TreeSet(); + + // When a new Container type is added, we need to add what fields need + // to be filtered here if (type.equals(KeyValueContainerData.class)) { // filter properties for (Property prop : set) { String name = prop.getName(); - // When a new field needs to be added, it needs to be added here. - if (name.equals("containerType") || name.equals("containerId") || - name.equals("layOutVersion") || name.equals("state") || - name.equals("metadata") || name.equals("metadataPath") || - name.equals("chunksPath") || name.equals( - "containerDBType")) { + if (YAML_FIELDS.contains(name)) { filtered.add(prop); } } @@ -155,11 +165,12 @@ protected Set getProperties(Class type) /** * Constructor class for KeyValueData, which will be used by Yaml. */ - private static class KeyValueDataConstructor extends Constructor { - KeyValueDataConstructor() { + private static class ContainerDataConstructor extends Constructor { + ContainerDataConstructor() { //Adding our own specific constructors for tags. - this.yamlConstructors.put(new Tag("KeyValueContainerData"), - new ConstructKeyValueContainerData()); + // When a new Container type is added, we need to add yamlConstructor + // for that + this.yamlConstructors.put(YAML_TAG, new ConstructKeyValueContainerData()); this.yamlConstructors.put(Tag.INT, new ConstructLong()); } @@ -167,21 +178,14 @@ private class ConstructKeyValueContainerData extends AbstractConstruct { public Object construct(Node node) { MappingNode mnode = (MappingNode) node; Map nodes = constructMapping(mnode); - String type = (String) nodes.get("containerType"); - - ContainerProtos.ContainerType containerType = ContainerProtos - .ContainerType.KeyValueContainer; - if (type.equals("KeyValueContainer")) { - containerType = ContainerProtos.ContainerType.KeyValueContainer; - } //Needed this, as TAG.INT type is by default converted to Long. long layOutVersion = (long) nodes.get("layOutVersion"); int lv = (int) layOutVersion; //When a new field is added, it needs to be added here. - KeyValueContainerData kvData = new KeyValueContainerData(containerType, - (long) nodes.get("containerId"), lv); + KeyValueContainerData kvData = new KeyValueContainerData((long) nodes + .get("containerId"), lv); kvData.setContainerDBType((String)nodes.get("containerDBType")); kvData.setMetadataPath((String) nodes.get( "metadataPath")); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index b5fb08de5c..c485caf6d2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.impl; import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.container.common.helpers @@ -90,6 +91,16 @@ public void init() { public void shutdown() { } + @Override + public Handler getHandler(ContainerProtos.ContainerType containerType) { + return null; + } + + @Override + public void setScmId(String scmId) { + // DO nothing, this will be removed when cleanup. + } + @Override public ContainerCommandResponseProto dispatch( ContainerCommandRequestProto msg) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index e73b761302..cbb48ec5da 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -52,24 +52,23 @@ public class HddsDispatcher implements ContainerDispatcher { private final Configuration conf; private final ContainerSet containerSet; private final VolumeSet volumeSet; - private final String scmID; + private String scmID; /** * Constructs an OzoneContainer that receives calls from * XceiverServerHandler. */ public HddsDispatcher(Configuration config, ContainerSet contSet, - VolumeSet volumes, String scmId) { - // TODO: Pass ContainerSet, VolumeSet and scmID, intialize metrics + VolumeSet volumes) { + //TODO: initialize metrics this.conf = config; this.containerSet = contSet; this.volumeSet = volumes; - this.scmID = scmId; this.handlers = Maps.newHashMap(); for (ContainerType containerType : ContainerType.values()) { handlers.put(containerType, Handler.getHandlerForContainerType( - containerType, conf, containerSet, volumeSet, scmID)); + containerType, conf, containerSet, volumeSet)); } } @@ -103,7 +102,7 @@ public ContainerCommandResponseProto dispatch( return ContainerUtils.logAndReturnError(LOG, ex, msg); } - Handler handler = getHandlerForContainerType(containerType); + Handler handler = getHandler(containerType); if (handler == null) { StorageContainerException ex = new StorageContainerException("Invalid " + "ContainerType " + containerType, @@ -113,9 +112,20 @@ public ContainerCommandResponseProto dispatch( return handler.handle(msg, container); } - @VisibleForTesting - public Handler getHandlerForContainerType(ContainerType type) { - return handlers.get(type); + @Override + public Handler getHandler(ContainerProtos.ContainerType containerType) { + return handlers.get(containerType); + } + + @Override + public void setScmId(String scmId) { + Preconditions.checkNotNull(scmId, "scmId Cannot be null"); + if (this.scmID == null) { + this.scmID = scmId; + for (Map.Entry handlerMap : handlers.entrySet()) { + handlerMap.getValue().setScmID(scmID); + } + } } private long getContainerID(ContainerCommandRequestProto request) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java index 7e12614c6e..18644bb9c0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.interfaces; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -48,4 +49,17 @@ public interface ContainerDispatcher { * Shutdown Dispatcher services. */ void shutdown(); + + /** + * Returns the handler for the specified containerType. + * @param containerType + * @return + */ + Handler getHandler(ContainerProtos.ContainerType containerType); + + /** + * If scmId is not set, this will set scmId, otherwise it is a no-op. + * @param scmId + */ + void setScmId(String scmId); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index d08ad74f50..8069d71788 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -31,7 +31,6 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; -import java.io.IOException; /** * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type @@ -42,22 +41,20 @@ public class Handler { protected final Configuration conf; protected final ContainerSet containerSet; protected final VolumeSet volumeSet; - protected final String scmID; + protected String scmID; protected Handler(Configuration config, ContainerSet contSet, - VolumeSet volumeSet, String scmID) { + VolumeSet volumeSet) { conf = config; containerSet = contSet; this.volumeSet = volumeSet; - this.scmID = scmID; } public static Handler getHandlerForContainerType(ContainerType containerType, - Configuration config, ContainerSet contSet, VolumeSet volumeSet, - String scmID) { + Configuration config, ContainerSet contSet, VolumeSet volumeSet) { switch (containerType) { case KeyValueContainer: - return KeyValueHandler.getInstance(config, contSet, volumeSet, scmID); + return KeyValueHandler.getInstance(config, contSet, volumeSet); default: throw new IllegalArgumentException("Handler for ContainerType: " + containerType + "doesn't exist."); @@ -68,4 +65,8 @@ public ContainerCommandResponseProto handle( ContainerCommandRequestProto msg, Container container) { return null; } + + public void setScmID(String scmId) { + this.scmID = scmId; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index dc4e673126..b6a9bb92b1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -93,8 +93,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, // trick. commandDispatcher = CommandDispatcher.newBuilder() .addHandler(new CloseContainerCommandHandler()) - .addHandler(new DeleteBlocksCommandHandler( - container.getContainerManager(), conf)) + .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), + conf)) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java index 63f57b4845..50dea0a146 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java @@ -97,6 +97,7 @@ public BlockDeletingService(ContainerManager containerManager, OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); } + @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index f954d98f93..4fc1cd90ff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -18,8 +18,10 @@ import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; @@ -29,11 +31,13 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers .DeletedContainerBlocksSummary; -import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine @@ -51,6 +55,8 @@ import java.io.IOException; import java.util.List; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; + /** * Handle block deletion commands. */ @@ -59,14 +65,14 @@ public class DeleteBlocksCommandHandler implements CommandHandler { private static final Logger LOG = LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); - private ContainerManager containerManager; - private Configuration conf; + private final ContainerSet containerSet; + private final Configuration conf; private int invocationCount; private long totalTime; - public DeleteBlocksCommandHandler(ContainerManager containerManager, + public DeleteBlocksCommandHandler(ContainerSet cset, Configuration conf) { - this.containerManager = containerManager; + this.containerSet = cset; this.conf = conf; } @@ -105,8 +111,24 @@ public void handle(SCMCommand command, OzoneContainer container, DeleteBlockTransactionResult.newBuilder(); txResultBuilder.setTxID(entry.getTxID()); try { - deleteContainerBlocks(entry, conf); - txResultBuilder.setSuccess(true); + long containerId = entry.getContainerID(); + Container cont = containerSet.getContainer(containerId); + if(cont == null) { + throw new StorageContainerException("Unable to find the container " + + containerId, CONTAINER_NOT_FOUND); + } + ContainerProtos.ContainerType containerType = cont.getContainerType(); + switch (containerType) { + case KeyValueContainer: + KeyValueContainerData containerData = (KeyValueContainerData) + cont.getContainerData(); + deleteKeyValueContainerBlocks(containerData, entry); + txResultBuilder.setSuccess(true); + break; + default: + LOG.error("Delete Blocks Command Handler is not implemented for " + + "containerType {}", containerType); + } } catch (IOException e) { LOG.warn("Failed to delete blocks for container={}, TXID={}", entry.getContainerID(), entry.getTxID(), e); @@ -145,21 +167,21 @@ public void handle(SCMCommand command, OzoneContainer container, * Move a bunch of blocks from a container to deleting state. * This is a meta update, the actual deletes happen in async mode. * + * @param containerData - KeyValueContainerData * @param delTX a block deletion transaction. - * @param config configuration. * @throws IOException if I/O error occurs. */ - private void deleteContainerBlocks(DeletedBlocksTransaction delTX, - Configuration config) throws IOException { + private void deleteKeyValueContainerBlocks( + KeyValueContainerData containerData, DeletedBlocksTransaction delTX) + throws IOException { long containerId = delTX.getContainerID(); - ContainerData containerInfo = containerManager.readContainer(containerId); if (LOG.isDebugEnabled()) { LOG.debug("Processing Container : {}, DB path : {}", containerId, - containerInfo.getDBPath()); + containerData.getMetadataPath()); } int newDeletionBlocks = 0; - MetadataStore containerDB = KeyUtils.getDB(containerInfo, config); + MetadataStore containerDB = KeyUtils.getDB(containerData, conf); for (Long blk : delTX.getLocalIDList()) { BatchOperation batch = new BatchOperation(); byte[] blkBytes = Longs.toByteArray(blk); @@ -187,12 +209,12 @@ private void deleteContainerBlocks(DeletedBlocksTransaction delTX, + " container {}, skip deleting it.", blk, containerId); } containerDB.put(DFSUtil.string2Bytes( - OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()), + OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + containerId), Longs.toByteArray(delTX.getTxID())); } // update pending deletion blocks count in in-memory container status - containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId); + containerData.incrPendingDeletionBlocks(newDeletionBlocks); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java index 3e11d1233c..1758c03fef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -95,7 +95,8 @@ public void execute(ExecutorService executor) { getEndPointTask(EndpointStateMachine endpoint) { switch (endpoint.getState()) { case GETVERSION: - return new VersionEndpointTask(endpoint, conf); + return new VersionEndpointTask(endpoint, conf, context.getParent() + .getContainer()); case REGISTER: return RegisterEndpointTask.newBuilder() .setConfig(conf) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java index b048ee5b5c..e4cb4d54a9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java @@ -16,14 +16,22 @@ */ package org.apache.hadoop.ozone.container.common.states.endpoint; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.VersionResponse; import java.io.IOException; +import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; /** @@ -33,11 +41,13 @@ public class VersionEndpointTask implements Callable { private final EndpointStateMachine rpcEndPoint; private final Configuration configuration; + private final OzoneContainer ozoneContainer; public VersionEndpointTask(EndpointStateMachine rpcEndPoint, - Configuration conf) { + Configuration conf, OzoneContainer container) { this.rpcEndPoint = rpcEndPoint; this.configuration = conf; + this.ozoneContainer = container; } /** @@ -52,7 +62,27 @@ public EndpointStateMachine.EndPointStates call() throws Exception { try{ SCMVersionResponseProto versionResponse = rpcEndPoint.getEndPoint().getVersion(null); - rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse)); + VersionResponse response = VersionResponse.getFromProtobuf( + versionResponse); + rpcEndPoint.setVersion(response); + VolumeSet volumeSet = ozoneContainer.getVolumeSet(); + Map volumeMap = volumeSet.getVolumeMap(); + List keyValues = versionResponse.getKeysList(); + + String scmId = response.getValue(OzoneConsts.SCM_ID); + String clusterId = response.getValue(OzoneConsts.CLUSTER_ID); + + Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " + + "null"); + Preconditions.checkNotNull(scmId, "Reply from SCM: clusterId cannot be" + + " null"); + + // If version file does not exist create version file and also set scmId + for (Map.Entry entry : volumeMap.entrySet()) { + HddsVolume hddsVolume = entry.getValue(); + hddsVolume.format(clusterId); + ozoneContainer.getDispatcher().setScmId(scmId); + } EndpointStateMachine.EndPointStates nextState = rpcEndPoint.getState().getNextState(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 788e2cf3a5..900613321d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -130,6 +130,10 @@ private HddsVolume(Builder b) throws IOException { initialize(); } + public VolumeInfo getVolumeInfo() { + return volumeInfo; + } + /** * Initializes the volume. * Creates the Version file if not present, @@ -327,4 +331,6 @@ public enum VolumeState { public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) { volumeInfo.setScmUsageForTesting(scmUsageForTest); } + + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java index 9e052b037a..e35becd9bd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java @@ -27,8 +27,13 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.InconsistentStorageStateException; +import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; @@ -309,4 +314,47 @@ public Map getVolumeMap() { public Map> getVolumeStateMap() { return ImmutableMap.copyOf(volumeStateMap); } + + public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport() + throws IOException { + boolean failed; + StorageLocationReport[] reports = + new StorageLocationReport[volumeMap.size()]; + int counter = 0; + for (Map.Entry entry : volumeMap.entrySet()) { + HddsVolume hddsVolume = entry.getValue(); + VolumeInfo volumeInfo = hddsVolume.getVolumeInfo(); + long scmUsed = 0; + long remaining = 0; + failed = false; + try { + scmUsed = volumeInfo.getScmUsed(); + remaining = volumeInfo.getAvailable(); + } catch (IOException ex) { + LOG.warn("Failed to get scmUsed and remaining for container " + + "storage location {}", volumeInfo.getRootDir()); + // reset scmUsed and remaining if df/du failed. + scmUsed = 0; + remaining = 0; + failed = true; + } + + StorageLocationReport.Builder builder = + StorageLocationReport.newBuilder(); + builder.setStorageLocation(volumeInfo.getRootDir()) + .setId(hddsVolume.getStorageID()) + .setFailed(failed) + .setCapacity(hddsVolume.getCapacity()) + .setRemaining(remaining) + .setScmUsed(scmUsed) + .setStorageType(hddsVolume.getStorageType()); + StorageLocationReport r = builder.build(); + reports[counter++] = r; + } + NodeReportProto.Builder nrb = NodeReportProto.newBuilder(); + for (int i = 0; i < reports.length; i++) { + nrb.addStorageReport(reports[i].getProtoBufMessage()); + } + return nrb.build(); + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index a1cbb4ebb2..553e3f5d7a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.container.keyvalue; import com.google.common.base.Preconditions; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileUtil; @@ -33,6 +32,7 @@ import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.interfaces.Container; @@ -47,21 +47,16 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Writer; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CONTAINER_ALREADY_EXISTS; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.CONTAINER_CHECKSUM_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CONTAINER_METADATA_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -74,8 +69,6 @@ .Result.ERROR_IN_COMPACT_DB; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.INVALID_CONTAINER_STATE; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Result.NO_SUCH_ALGORITHM; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.UNSUPPORTED_REQUEST; @@ -198,10 +191,12 @@ private void createContainerFile(File containerFile, File try { tempContainerFile = createTempFile(containerFile); tempCheckSumFile = createTempFile(containerCheckSumFile); - KeyValueYaml.createContainerFile(tempContainerFile, containerData); + ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType + .KeyValueContainer, tempContainerFile, containerData); //Compute Checksum for container file - String checksum = computeCheckSum(tempContainerFile); + String checksum = KeyValueContainerUtil.computeCheckSum(containerId, + tempContainerFile); containerCheckSumStream = new FileOutputStream(tempCheckSumFile); writer = new OutputStreamWriter(containerCheckSumStream, "UTF-8"); writer.write(checksum); @@ -308,43 +303,6 @@ private void updateContainerFile(File containerFile, File } - /** - * Compute checksum of the .container file. - * @param containerFile - * @throws StorageContainerException - */ - private String computeCheckSum(File containerFile) throws - StorageContainerException { - - MessageDigest sha; - FileInputStream containerFileStream = null; - try { - sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); - } catch (NoSuchAlgorithmException e) { - throw new StorageContainerException("Unable to create Message Digest," - + " usually this is a java configuration issue.", - NO_SUCH_ALGORITHM); - } - - try { - containerFileStream = new FileInputStream(containerFile); - byte[] byteArray = new byte[1024]; - int bytesCount = 0; - - while ((bytesCount = containerFileStream.read(byteArray)) != -1) { - sha.update(byteArray, 0, bytesCount); - } - String checksum = DigestUtils.sha256Hex(sha.digest()); - return checksum; - } catch (IOException ex) { - throw new StorageContainerException("Error during update of " + - "check sum file. Container Name: " + containerData.getContainerId(), - ex, CONTAINER_CHECKSUM_ERROR); - } finally { - IOUtils.closeStream(containerFileStream); - } - } - @Override public void delete(boolean forceDelete) throws StorageContainerException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index 8da408413b..3b244684a8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -18,12 +18,14 @@ package org.apache.hadoop.ozone.container.keyvalue; +import com.google.common.collect.Lists; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.yaml.snakeyaml.nodes.Tag; import java.io.File; -import java.io.IOException; +import java.util.List; import java.util.Map; /** @@ -33,6 +35,14 @@ */ public class KeyValueContainerData extends ContainerData { + // Yaml Tag used for KeyValueContainerData. + public static final Tag YAML_TAG = new Tag("KeyValueContainerData"); + + // Fields need to be stored in .container file. + public static final List YAML_FIELDS = Lists.newArrayList( + "containerType", "containerId", "layOutVersion", "state", "metadata", + "metadataPath", "chunksPath", "containerDBType"); + // Path to Container metadata Level DB/RocksDB Store and .container file. private String metadataPath; @@ -49,23 +59,21 @@ public class KeyValueContainerData extends ContainerData { /** * Constructs KeyValueContainerData object. - * @param type - containerType * @param id - ContainerId */ - public KeyValueContainerData(ContainerProtos.ContainerType type, long id) { - super(type, id); + public KeyValueContainerData(long id) { + super(ContainerProtos.ContainerType.KeyValueContainer, id); this.numPendingDeletionBlocks = 0; } /** * Constructs KeyValueContainerData object. - * @param type - containerType * @param id - ContainerId * @param layOutVersion */ - public KeyValueContainerData(ContainerProtos.ContainerType type, long id, + public KeyValueContainerData(long id, int layOutVersion) { - super(type, id, layOutVersion); + super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion); this.numPendingDeletionBlocks = 0; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index d9ee7fdb45..ffe0f21cd4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.keyvalue; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.sun.jersey.spi.resource.Singleton; @@ -93,16 +94,16 @@ public class KeyValueHandler extends Handler { // TODO : Add metrics and populate it. public static KeyValueHandler getInstance(Configuration config, - ContainerSet contSet, VolumeSet volSet, String scmID) { + ContainerSet contSet, VolumeSet volSet) { if (INSTANCE == null) { - INSTANCE = new KeyValueHandler(config, contSet, volSet, scmID); + INSTANCE = new KeyValueHandler(config, contSet, volSet); } return INSTANCE; } private KeyValueHandler(Configuration config, ContainerSet contSet, - VolumeSet volSet, String scmID) { - super(config, contSet, volSet, scmID); + VolumeSet volSet) { + super(config, contSet, volSet); containerType = ContainerType.KeyValueContainer; keyManager = new KeyManagerImpl(config); chunkManager = new ChunkManagerImpl(); @@ -156,6 +157,16 @@ public ContainerCommandResponseProto handle( return null; } + @VisibleForTesting + public ChunkManager getChunkManager() { + return this.chunkManager; + } + + @VisibleForTesting + public KeyManager getKeyManager() { + return this.keyManager; + } + /** * Handles Create Container Request. If successful, adds the container to * ContainerSet. @@ -180,7 +191,7 @@ ContainerCommandResponseProto handleCreateContainer( } KeyValueContainerData newContainerData = new KeyValueContainerData( - containerType, containerID); + containerID); // TODO: Add support to add metadataList to ContainerData. Add metadata // to container during creation. KeyValueContainer newContainer = new KeyValueContainer( @@ -262,7 +273,6 @@ ContainerCommandResponseProto handleDeleteContainer( boolean forceDelete = request.getDeleteContainer().getForceDelete(); kvContainer.writeLock(); - try { // Check if container is open if (kvContainer.getContainerData().isOpen()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index b868f1d56c..029e94ddef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -18,9 +18,11 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers; import com.google.common.base.Preconditions; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; @@ -28,15 +30,29 @@ .ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.utils.MetadataKeyFilters; import org.apache.hadoop.utils.MetadataStore; import org.apache.hadoop.utils.MetadataStoreBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*; /** * Class which defines utility methods for KeyValueContainer. @@ -170,4 +186,122 @@ public static ContainerCommandResponseProto getReadContainerResponse( builder.setReadContainer(response); return builder.build(); } + + /** + * Compute checksum of the .container file. + * @param containerId + * @param containerFile + * @throws StorageContainerException + */ + public static String computeCheckSum(long containerId, File + containerFile) throws StorageContainerException { + Preconditions.checkNotNull(containerFile, "containerFile cannot be null"); + MessageDigest sha; + FileInputStream containerFileStream = null; + try { + sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + } catch (NoSuchAlgorithmException e) { + throw new StorageContainerException("Unable to create Message Digest, " + + "usually this is a java configuration issue.", NO_SUCH_ALGORITHM); + } + try { + containerFileStream = new FileInputStream(containerFile); + byte[] byteArray = new byte[1024]; + int bytesCount = 0; + while ((bytesCount = containerFileStream.read(byteArray)) != -1) { + sha.update(byteArray, 0, bytesCount); + } + String checksum = DigestUtils.sha256Hex(sha.digest()); + return checksum; + } catch (IOException ex) { + throw new StorageContainerException("Error during computing checksum: " + + "for container " + containerId, ex, CONTAINER_CHECKSUM_ERROR); + } finally { + IOUtils.closeStream(containerFileStream); + } + } + + /** + * Verify checksum of the container. + * @param containerId + * @param checksumFile + * @param checksum + * @throws StorageContainerException + */ + public static void verifyCheckSum(long containerId, File checksumFile, + String checksum) + throws StorageContainerException { + try { + Preconditions.checkNotNull(checksum); + Preconditions.checkNotNull(checksumFile); + Path path = Paths.get(checksumFile.getAbsolutePath()); + List fileCheckSum = Files.readAllLines(path); + Preconditions.checkState(fileCheckSum.size() == 1, "checksum " + + "should be 32 byte string"); + if (!checksum.equals(fileCheckSum.get(0))) { + LOG.error("Checksum mismatch for the container {}", containerId); + throw new StorageContainerException("Checksum mismatch for " + + "the container " + containerId, CHECKSUM_MISMATCH); + } + } catch (StorageContainerException ex) { + throw ex; + } catch (IOException ex) { + LOG.error("Error during verify checksum for container {}", containerId); + throw new StorageContainerException("Error during verify checksum" + + " for container " + containerId, IO_EXCEPTION); + } + } + + /** + * Parse KeyValueContainerData and verify checksum. + * @param containerData + * @param containerFile + * @param checksumFile + * @param dbFile + * @param config + * @throws IOException + */ + public static void parseKeyValueContainerData( + KeyValueContainerData containerData, File containerFile, File + checksumFile, File dbFile, OzoneConfiguration config) throws IOException { + + Preconditions.checkNotNull(containerData, "containerData cannot be null"); + Preconditions.checkNotNull(containerFile, "containerFile cannot be null"); + Preconditions.checkNotNull(checksumFile, "checksumFile cannot be null"); + Preconditions.checkNotNull(dbFile, "dbFile cannot be null"); + Preconditions.checkNotNull(config, "ozone config cannot be null"); + + long containerId = containerData.getContainerId(); + String containerName = String.valueOf(containerId); + File metadataPath = new File(containerData.getMetadataPath()); + + Preconditions.checkNotNull(containerName, "container Name cannot be " + + "null"); + Preconditions.checkNotNull(metadataPath, "metadata path cannot be " + + "null"); + + // Verify Checksum + String checksum = KeyValueContainerUtil.computeCheckSum( + containerData.getContainerId(), containerFile); + KeyValueContainerUtil.verifyCheckSum(containerId, checksumFile, checksum); + + containerData.setDbFile(dbFile); + + MetadataStore metadata = KeyUtils.getDB(containerData, config); + long bytesUsed = 0; + List> liveKeys = metadata + .getRangeKVs(null, Integer.MAX_VALUE, + MetadataKeyFilters.getNormalKeyFilter()); + bytesUsed = liveKeys.parallelStream().mapToLong(e-> { + KeyData keyData; + try { + keyData = KeyUtils.getKeyData(e.getValue()); + return keyData.getSize(); + } catch (IOException ex) { + return 0L; + } + }).sum(); + containerData.setBytesUsed(bytesUsed); + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java new file mode 100644 index 0000000000..68823bcf96 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.ozoneimpl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.Storage; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; + + +/** + * Class used to read .container files from Volume and build container map. + */ +public class ContainerReader implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger( + ContainerReader.class); + private File hddsVolumeDir; + private final ContainerSet containerSet; + private final OzoneConfiguration config; + + ContainerReader(File volumeRoot, ContainerSet cset, OzoneConfiguration conf) { + Preconditions.checkNotNull(volumeRoot); + this.hddsVolumeDir = volumeRoot; + this.containerSet = cset; + this.config = conf; + } + + @Override + public void run() { + try { + readVolume(hddsVolumeDir); + } catch (RuntimeException ex) { + LOG.info("Caught an Run time exception during reading container files" + + " from Volume {}", hddsVolumeDir); + } + } + + public void readVolume(File hddsVolumeRootDir) { + Preconditions.checkNotNull(hddsVolumeRootDir, "hddsVolumeRootDir" + + "cannot be null"); + + + /** + * + * layout of the container directory on the disk. + * /hdds/<>/current/<>//metadata + * /<>.container + * /hdds/<>/current/<>/<>/metadata + * /<>.checksum + * /hdds/<>/current/<>/<>/metadata + * /<>.db + * /hdds/<>/current/<>/<>/chunks + * /<> + * + **/ + + //filtering scm directory + File[] scmDir = hddsVolumeRootDir.listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + return pathname.isDirectory(); + } + }); + + for (File scmLoc : scmDir) { + File currentDir = null; + currentDir = new File(scmLoc, Storage.STORAGE_DIR_CURRENT); + File[] containerTopDirs = currentDir.listFiles(); + if (containerTopDirs != null) { + for (File containerTopDir : containerTopDirs) { + if (containerTopDir.isDirectory()) { + File[] containerDirs = containerTopDir.listFiles(); + for (File containerDir : containerDirs) { + File metadataPath = new File(containerDir + File.separator + + OzoneConsts.CONTAINER_META_PATH); + String containerName = containerDir.getName(); + if (metadataPath.exists()) { + File containerFile = KeyValueContainerLocationUtil + .getContainerFile(metadataPath, containerName); + File checksumFile = KeyValueContainerLocationUtil + .getContainerCheckSumFile(metadataPath, containerName); + File dbFile = KeyValueContainerLocationUtil + .getContainerDBFile(metadataPath, containerName); + if (containerFile.exists() && checksumFile.exists() && + dbFile.exists()) { + verifyContainerFile(containerFile, checksumFile, dbFile); + } else { + LOG.error("Missing container metadata files for Container: " + + "{}", containerName); + } + } else { + LOG.error("Missing container metadata directory for " + + "Container: {}", containerName); + } + } + } + } + } + } + } + + private void verifyContainerFile(File containerFile, File checksumFile, + File dbFile) { + try { + ContainerData containerData = ContainerDataYaml.readContainerFile( + containerFile); + + switch (containerData.getContainerType()) { + case KeyValueContainer: + KeyValueContainerData keyValueContainerData = (KeyValueContainerData) + containerData; + KeyValueContainerUtil.parseKeyValueContainerData(keyValueContainerData, + containerFile, checksumFile, dbFile, config); + KeyValueContainer keyValueContainer = new KeyValueContainer( + keyValueContainerData, config); + containerSet.addContainer(keyValueContainer); + break; + default: + LOG.error("Unrecognized ContainerType {} format during verify " + + "ContainerFile", containerData.getContainerType()); + } + } catch (IOException ex) { + LOG.error("Error during reading container file {}", containerFile); + } + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 4156f5a4e0..9e25c59a47 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -1,72 +1,49 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; -import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; -import org.apache.hadoop.ozone.container.common.impl.Dispatcher; -import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl; -import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; -import org.apache.hadoop.ozone.container.common.interfaces.KeyManager; -import org.apache.hadoop.ozone.container.common.statemachine.background - .BlockDeletingService; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer; -import org.apache.hadoop.ozone.container.common.transport.server - .XceiverServerGrpc; -import org.apache.hadoop.ozone.container.common.transport.server - .XceiverServerSpi; -import org.apache.hadoop.ozone.container.common.transport.server.ratis - .XceiverServerRatis; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; +import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; + +import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Paths; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; +import java.io.*; +import java.util.ArrayList; +import java.util.Iterator; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; /** @@ -74,69 +51,75 @@ * layer. */ public class OzoneContainer { - public static final Logger LOG = - LoggerFactory.getLogger(OzoneContainer.class); - private final Configuration ozoneConfig; - private final ContainerDispatcher dispatcher; - private final ContainerManager manager; + public static final Logger LOG = LoggerFactory.getLogger( + OzoneContainer.class); + + private final HddsDispatcher hddsDispatcher; + private final DatanodeDetails dnDetails; + private final OzoneConfiguration config; + private final VolumeSet volumeSet; + private final ContainerSet containerSet; private final XceiverServerSpi[] server; - private final ChunkManager chunkManager; - private final KeyManager keyManager; - private final BlockDeletingService blockDeletingService; /** - * Creates a network endpoint and enables Ozone container. - * - * @param ozoneConfig - Config + * Construct OzoneContainer object. + * @param datanodeDetails + * @param conf + * @throws DiskOutOfSpaceException * @throws IOException */ - public OzoneContainer( - DatanodeDetails datanodeDetails, Configuration ozoneConfig) - throws IOException { - this.ozoneConfig = ozoneConfig; - List locations = new LinkedList<>(); - String[] paths = ozoneConfig.getStrings( - OzoneConfigKeys.OZONE_METADATA_DIRS); - if (paths != null && paths.length > 0) { - for (String p : paths) { - locations.add(StorageLocation.parse( - Paths.get(p).resolve(CONTAINER_ROOT_PREFIX).toString())); - } - } else { - getDataDir(locations); - } - - manager = new ContainerManagerImpl(); - manager.init(this.ozoneConfig, locations, datanodeDetails); - this.chunkManager = new ChunkManagerImpl(manager); - manager.setChunkManager(this.chunkManager); - - this.keyManager = new KeyManagerImpl(manager, ozoneConfig); - manager.setKeyManager(this.keyManager); - - long svcInterval = - ozoneConfig.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, - OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - long serviceTimeout = ozoneConfig.getTimeDuration( - OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, - OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - this.blockDeletingService = new BlockDeletingService(manager, - svcInterval, serviceTimeout, ozoneConfig); - - this.dispatcher = new Dispatcher(manager, this.ozoneConfig); - - boolean useGrpc = this.ozoneConfig.getBoolean( + public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration + conf) throws IOException { + this.dnDetails = datanodeDetails; + this.config = conf; + this.volumeSet = new VolumeSet(datanodeDetails, conf); + this.containerSet = new ContainerSet(); + boolean useGrpc = this.config.getBoolean( ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT); + buildContainerSet(); + hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet); server = new XceiverServerSpi[]{ - useGrpc ? new XceiverServerGrpc(datanodeDetails, - this.ozoneConfig, this.dispatcher) : + useGrpc ? new XceiverServerGrpc(datanodeDetails, this.config, this + .hddsDispatcher) : new XceiverServer(datanodeDetails, - this.ozoneConfig, this.dispatcher), - XceiverServerRatis - .newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher) + this.config, this.hddsDispatcher), + XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this + .config, hddsDispatcher) }; + + + } + + + /** + * Build's container map. + */ + public void buildContainerSet() { + Iterator volumeSetIterator = volumeSet.getVolumesList() + .iterator(); + ArrayList volumeThreads = new ArrayList(); + + //TODO: diskchecker should be run before this, to see how disks are. + // And also handle disk failure tolerance need to be added + while (volumeSetIterator.hasNext()) { + HddsVolume volume = volumeSetIterator.next(); + File hddsVolumeRootDir = volume.getHddsRootDir(); + Thread thread = new Thread(new ContainerReader(hddsVolumeRootDir, + containerSet, config)); + thread.start(); + volumeThreads.add(thread); + } + + try { + for (int i = 0; i < volumeThreads.size(); i++) { + volumeThreads.get(i).join(); + } + } catch (InterruptedException ex) { + LOG.info("Volume Threads Interrupted exception", ex); + } + } /** @@ -145,155 +128,46 @@ public OzoneContainer( * @throws IOException */ public void start() throws IOException { + LOG.info("Attempting to start container services."); for (XceiverServerSpi serverinstance : server) { serverinstance.start(); } - blockDeletingService.start(); - dispatcher.init(); + hddsDispatcher.init(); } /** - * Stops the ozone container. - *

- * Shutdown logic is not very obvious from the following code. if you need to - * modify the logic, please keep these comments in mind. Here is the shutdown - * sequence. - *

- * 1. We shutdown the network ports. - *

- * 2. Now we need to wait for all requests in-flight to finish. - *

- * 3. The container manager lock is a read-write lock with "Fairness" - * enabled. - *

- * 4. This means that the waiting threads are served in a "first-come-first - * -served" manner. Please note that this applies to waiting threads only. - *

- * 5. Since write locks are exclusive, if we are waiting to get a lock it - * implies that we are waiting for in-flight operations to complete. - *

- * 6. if there are other write operations waiting on the reader-writer lock, - * fairness guarantees that they will proceed before the shutdown lock - * request. - *

- * 7. Since all operations either take a reader or writer lock of container - * manager, we are guaranteed that we are the last operation since we have - * closed the network port, and we wait until close is successful. - *

- * 8. We take the writer lock and call shutdown on each of the managers in - * reverse order. That is chunkManager, keyManager and containerManager is - * shutdown. + * Stop Container Service on the datanode. */ public void stop() { + //TODO: at end of container IO integration work. LOG.info("Attempting to stop container services."); for(XceiverServerSpi serverinstance: server) { serverinstance.stop(); } - dispatcher.shutdown(); - - try { - this.manager.writeLock(); - this.chunkManager.shutdown(); - this.keyManager.shutdown(); - this.manager.shutdown(); - this.blockDeletingService.shutdown(); - LOG.info("container services shutdown complete."); - } catch (IOException ex) { - LOG.warn("container service shutdown error:", ex); - } finally { - this.manager.writeUnlock(); - } + hddsDispatcher.shutdown(); } - /** - * Returns a paths to data dirs. - * - * @param pathList - List of paths. - * @throws IOException - */ - private void getDataDir(List pathList) throws IOException { - for (String dir : ozoneConfig.getStrings(DFS_DATANODE_DATA_DIR_KEY)) { - StorageLocation location = StorageLocation.parse(dir); - pathList.add(location); - } - } - /** - * Returns node report of container storage usage. - */ - public NodeReportProto getNodeReport() throws IOException { - return this.manager.getNodeReport(); + @VisibleForTesting + public ContainerSet getContainerSet() { + return containerSet; } - - private int getPortbyType(HddsProtos.ReplicationType replicationType) { - for (XceiverServerSpi serverinstance : server) { - if (serverinstance.getServerType() == replicationType) { - return serverinstance.getIPCPort(); - } - } - return INVALID_PORT; - } - - /** - * Returns the container server IPC port. - * - * @return Container server IPC port. - */ - public int getContainerServerPort() { - return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE); - } - - /** - * Returns the Ratis container Server IPC port. - * - * @return Ratis port. - */ - public int getRatisContainerServerPort() { - return getPortbyType(HddsProtos.ReplicationType.RATIS); - } - /** * Returns container report. * @return - container report. * @throws IOException */ - public ContainerReportsProto getContainerReport() throws IOException { - return this.manager.getContainerReport(); + public StorageContainerDatanodeProtocolProtos.ContainerReportsProto + getContainerReport() throws IOException { + return this.containerSet.getContainerReport(); } -// TODO: remove getContainerReports /** - * Returns the list of closed containers. - * @return - List of closed containers. + * Submit ContainerRequest. + * @param request + * @param replicationType * @throws IOException */ - public List getClosedContainerReports() throws IOException { - return this.manager.getClosedContainerReports(); - } - - private XceiverServerSpi getRatisSerer() { - for (XceiverServerSpi serverInstance : server) { - if (serverInstance instanceof XceiverServerRatis) { - return serverInstance; - } - } - return null; - } - - private XceiverServerSpi getStandaAloneSerer() { - for (XceiverServerSpi serverInstance : server) { - if (!(serverInstance instanceof XceiverServerRatis)) { - return serverInstance; - } - } - return null; - } - - @VisibleForTesting - public ContainerManager getContainerManager() { - return this.manager; - } - public void submitContainerRequest( ContainerProtos.ContainerCommandRequestProto request, HddsProtos.ReplicationType replicationType) throws IOException { @@ -332,4 +206,66 @@ private long getContainerIdForCmd( + " not supported over HearBeat Response"); } } -} \ No newline at end of file + + private XceiverServerSpi getRatisSerer() { + for (XceiverServerSpi serverInstance : server) { + if (serverInstance instanceof XceiverServerRatis) { + return serverInstance; + } + } + return null; + } + + private XceiverServerSpi getStandaAloneSerer() { + for (XceiverServerSpi serverInstance : server) { + if (!(serverInstance instanceof XceiverServerRatis)) { + return serverInstance; + } + } + return null; + } + + private int getPortbyType(HddsProtos.ReplicationType replicationType) { + for (XceiverServerSpi serverinstance : server) { + if (serverinstance.getServerType() == replicationType) { + return serverinstance.getIPCPort(); + } + } + return INVALID_PORT; + } + + /** + * Returns the container server IPC port. + * + * @return Container server IPC port. + */ + public int getContainerServerPort() { + return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE); + } + + /** + * Returns the Ratis container Server IPC port. + * + * @return Ratis port. + */ + public int getRatisContainerServerPort() { + return getPortbyType(HddsProtos.ReplicationType.RATIS); + } + + /** + * Returns node report of container storage usage. + */ + public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport() + throws IOException { + return volumeSet.getNodeReport(); + } + + @VisibleForTesting + public ContainerDispatcher getDispatcher() { + return this.hddsDispatcher; + } + + public VolumeSet getVolumeSet() { + return volumeSet; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java index 83acf5bd6e..4d328d3d1e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java @@ -105,6 +105,10 @@ public SCMVersionResponseProto getProtobufMessage() { .addAllKeys(list).build(); } + public String getValue(String key) { + return this.values.get(key); + } + /** * Builder class. */ diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java index b63c5fbe9b..a24f096ddb 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java @@ -25,16 +25,20 @@ .StorageContainerDatanodeProtocolService; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB .StorageContainerDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.test.GenericTestUtils; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY; + /** * Test Endpoint class. */ @@ -109,8 +113,13 @@ public static InetSocketAddress getReuseableAddress() throws IOException { } } - public static Configuration getConf() { - return new Configuration(); + public static OzoneConfiguration getConf() { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(HDDS_DATANODE_DIR_KEY, GenericTestUtils + .getRandomizedTempPath()); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, GenericTestUtils + .getRandomizedTempPath()); + return conf; } public static OzoneConfiguration getOzoneConf() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index 14da9601e4..8f4b0e3fb0 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -38,6 +38,7 @@ .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; @@ -151,7 +152,10 @@ public long getBytesUsed() { return VersionResponse.newBuilder() .setVersion(versionInfo.getVersion()) .addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription()) + .addValue(OzoneConsts.SCM_ID, UUID.randomUUID().toString()) + .addValue(OzoneConsts.CLUSTER_ID, UUID.randomUUID().toString()) .build().getProtobufMessage(); + } private void sleepIfNeeded() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java index 52f291b48b..249b0febaf 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java @@ -42,8 +42,7 @@ public void testKeyValueData() { .ContainerLifeCycleState.CLOSED; AtomicLong val = new AtomicLong(0); - KeyValueContainerData kvData = new KeyValueContainerData(containerType, - containerId); + KeyValueContainerData kvData = new KeyValueContainerData(containerId); assertEquals(containerType, kvData.getContainerType()); assertEquals(containerId, kvData.getContainerId()); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java similarity index 88% rename from hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java rename to hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java index 75c0139080..e1b7bd2abd 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueYaml; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; @@ -36,7 +35,7 @@ /** * This class tests create/read .container files. */ -public class TestKeyValueYaml { +public class TestContainerDataYaml { @Test public void testCreateContainerFile() throws IOException { @@ -46,8 +45,7 @@ public void testCreateContainerFile() throws IOException { File filePath = new File(new FileSystemTestHelper().getTestRootDir()); filePath.mkdirs(); - KeyValueContainerData keyValueContainerData = new KeyValueContainerData( - ContainerProtos.ContainerType.KeyValueContainer, Long.MAX_VALUE); + KeyValueContainerData keyValueContainerData = new KeyValueContainerData(Long.MAX_VALUE); keyValueContainerData.setContainerDBType("RocksDB"); keyValueContainerData.setMetadataPath(path); keyValueContainerData.setChunksPath(path); @@ -55,14 +53,15 @@ public void testCreateContainerFile() throws IOException { File containerFile = new File(filePath, containerPath); // Create .container file with ContainerData - KeyValueYaml.createContainerFile(containerFile, keyValueContainerData); + ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType + .KeyValueContainer, containerFile, keyValueContainerData); //Check .container file exists or not. assertTrue(containerFile.exists()); // Read from .container file, and verify data. - KeyValueContainerData kvData = KeyValueYaml.readContainerFile( - containerFile); + KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml + .readContainerFile(containerFile); assertEquals(Long.MAX_VALUE, kvData.getContainerId()); assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData .getContainerType()); @@ -82,10 +81,12 @@ public void testCreateContainerFile() throws IOException { // Update .container file with new ContainerData. containerFile = new File(filePath, containerPath); - KeyValueYaml.createContainerFile(containerFile, kvData); + ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType + .KeyValueContainer, containerFile, kvData); // Reading newly updated data from .container file - kvData = KeyValueYaml.readContainerFile(containerFile); + kvData = (KeyValueContainerData) ContainerDataYaml.readContainerFile( + containerFile); // verify data. assertEquals(Long.MAX_VALUE, kvData.getContainerId()); @@ -113,7 +114,8 @@ public void testIncorrectContainerFile() throws IOException{ //Get file from resources folder ClassLoader classLoader = getClass().getClassLoader(); File file = new File(classLoader.getResource(path).getFile()); - KeyValueContainerData kvData = KeyValueYaml.readContainerFile(file); + KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml + .readContainerFile(file); fail("testIncorrectContainerFile failed"); } catch (IllegalStateException ex) { GenericTestUtils.assertExceptionContains("Unexpected " + @@ -135,7 +137,8 @@ public void testCheckBackWardCompatabilityOfContainerFile() throws //Get file from resources folder ClassLoader classLoader = getClass().getClassLoader(); File file = new File(classLoader.getResource(path).getFile()); - KeyValueContainerData kvData = KeyValueYaml.readContainerFile(file); + KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml + .readContainerFile(file); //Checking the Container file data is consistent or not assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, kvData diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java index 5a29e8a070..55d67738f0 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java @@ -53,8 +53,7 @@ public void testAddGetRemoveContainer() throws StorageContainerException { ContainerProtos.ContainerLifeCycleState state = ContainerProtos .ContainerLifeCycleState.CLOSED; - KeyValueContainerData kvData = new KeyValueContainerData( - ContainerProtos.ContainerType.KeyValueContainer, containerId); + KeyValueContainerData kvData = new KeyValueContainerData(containerId); kvData.setState(state); KeyValueContainer keyValueContainer = new KeyValueContainer(kvData, new OzoneConfiguration()); @@ -164,8 +163,7 @@ public void testListContainer() throws StorageContainerException { private ContainerSet createContainerSet() throws StorageContainerException { ContainerSet containerSet = new ContainerSet(); for (int i=0; i<10; i++) { - KeyValueContainerData kvData = new KeyValueContainerData( - ContainerProtos.ContainerType.KeyValueContainer, i); + KeyValueContainerData kvData = new KeyValueContainerData(i); if (i%2 == 0) { kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED); } else { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java index 50927d144b..6660e9b5b5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java @@ -52,7 +52,6 @@ public class TestHandler { private VolumeSet volumeSet; private Handler handler; - private final static String SCM_ID = UUID.randomUUID().toString(); private final static String DATANODE_UUID = UUID.randomUUID().toString(); @Before @@ -61,12 +60,12 @@ public void setup() throws Exception { this.containerSet = Mockito.mock(ContainerSet.class); this.volumeSet = Mockito.mock(VolumeSet.class); - this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, SCM_ID); + this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet); } @Test public void testGetKeyValueHandler() throws Exception { - Handler kvHandler = dispatcher.getHandlerForContainerType( + Handler kvHandler = dispatcher.getHandler( ContainerProtos.ContainerType.KeyValueContainer); Assert.assertTrue("getHandlerForContainerType returned incorrect handler", @@ -83,8 +82,7 @@ public void testGetHandlerForInvalidContainerType() { Assert.assertEquals("New ContainerType detected. Not an invalid " + "containerType", invalidContainerType, null); - Handler handler = dispatcher.getHandlerForContainerType( - invalidContainerType); + Handler handler = dispatcher.getHandler(invalidContainerType); Assert.assertEquals("Get Handler for Invalid ContainerType should " + "return null.", handler, null); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java index 4576db6b13..272bdb9e7f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java @@ -81,8 +81,7 @@ public void setUp() throws Exception { Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) .thenReturn(hddsVolume); - keyValueContainerData = new KeyValueContainerData( - ContainerProtos.ContainerType.KeyValueContainer, 1L); + keyValueContainerData = new KeyValueContainerData(1L); keyValueContainer = new KeyValueContainer( keyValueContainerData, config); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java index 722cece957..fa7c66d7a5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java @@ -79,8 +79,7 @@ public void setUp() throws Exception { Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) .thenReturn(hddsVolume); - keyValueContainerData = new KeyValueContainerData( - ContainerProtos.ContainerType.KeyValueContainer, 1L); + keyValueContainerData = new KeyValueContainerData(1L); keyValueContainer = new KeyValueContainer( keyValueContainerData, config); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index 006b82c613..de5f432e37 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume .RoundRobinVolumeChoosingPolicy; @@ -85,8 +86,7 @@ public void setUp() throws Exception { Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) .thenReturn(hddsVolume); - keyValueContainerData = new KeyValueContainerData( - ContainerProtos.ContainerType.KeyValueContainer, 1L); + keyValueContainerData = new KeyValueContainerData(1L); keyValueContainer = new KeyValueContainer( keyValueContainerData, conf); @@ -197,7 +197,8 @@ public void testCloseContainer() throws Exception { File containerFile = KeyValueContainerLocationUtil.getContainerFile( containerMetaDataLoc, containerName); - keyValueContainerData = KeyValueYaml.readContainerFile(containerFile); + keyValueContainerData = (KeyValueContainerData) ContainerDataYaml + .readContainerFile(containerFile); assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, keyValueContainerData.getState()); } @@ -237,7 +238,8 @@ public void testUpdateContainer() throws IOException { File containerFile = KeyValueContainerLocationUtil.getContainerFile( containerMetaDataLoc, containerName); - keyValueContainerData = KeyValueYaml.readContainerFile(containerFile); + keyValueContainerData = (KeyValueContainerData) ContainerDataYaml + .readContainerFile(containerFile); assertEquals(2, keyValueContainerData.getMetadata().size()); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index f4dd41c846..dbddf47d55 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -74,9 +74,10 @@ private void setup() throws Exception { .build(); this.volumeSet = new VolumeSet(datanodeDetails, conf); - this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, SCM_ID); - this.handler = (KeyValueHandler) dispatcher.getHandlerForContainerType( + this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet); + this.handler = (KeyValueHandler) dispatcher.getHandler( ContainerProtos.ContainerType.KeyValueContainer); + dispatcher.setScmId(UUID.randomUUID().toString()); } @Test @@ -87,8 +88,7 @@ public void testHandlerCommandHandling() throws Exception{ // Create mock HddsDispatcher and KeyValueHandler. this.handler = Mockito.mock(KeyValueHandler.class); this.dispatcher = Mockito.mock(HddsDispatcher.class); - Mockito.when(dispatcher.getHandlerForContainerType(any())).thenReturn - (handler); + Mockito.when(dispatcher.getHandler(any())).thenReturn(handler); Mockito.when(dispatcher.dispatch(any())).thenCallRealMethod(); Mockito.when(dispatcher.getContainer(anyLong())).thenReturn( Mockito.mock(KeyValueContainer.class)); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java new file mode 100644 index 0000000000..cf4bb6293f --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.ozoneimpl; + + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.util.Random; +import java.util.UUID; + + +import static org.junit.Assert.assertEquals; + +/** + * This class is used to test OzoneContainer. + */ +public class TestOzoneContainer { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + + private OzoneConfiguration conf; + private String scmId = UUID.randomUUID().toString(); + private VolumeSet volumeSet; + private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy; + private KeyValueContainerData keyValueContainerData; + private KeyValueContainer keyValueContainer; + private final DatanodeDetails datanodeDetails = createDatanodeDetails(); + + @Before + public void setUp() throws Exception { + conf = new OzoneConfiguration(); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, folder.getRoot() + .getAbsolutePath() + "," + folder.newFolder().getAbsolutePath()); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().getAbsolutePath()); + volumeSet = new VolumeSet(datanodeDetails, conf); + volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy(); + + for (int i=0; i<10; i++) { + keyValueContainerData = new KeyValueContainerData(i); + keyValueContainer = new KeyValueContainer( + keyValueContainerData, conf); + keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); + } + } + + @Test + public void testBuildContainerMap() throws Exception { + OzoneContainer ozoneContainer = new + OzoneContainer(datanodeDetails, conf); + ContainerSet containerset = ozoneContainer.getContainerSet(); + assertEquals(10, containerset.containerCount()); + } + + + private DatanodeDetails createDatanodeDetails() { + Random random = new Random(); + String ipAddress = + random.nextInt(256) + "." + random.nextInt(256) + "." + random + .nextInt(256) + "." + random.nextInt(256); + + String uuid = UUID.randomUUID().toString(); + String hostName = uuid; + DatanodeDetails.Port containerPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.STANDALONE, 0); + DatanodeDetails.Port ratisPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.RATIS, 0); + DatanodeDetails.Port restPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.REST, 0); + DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); + builder.setUuid(uuid) + .setHostName("localhost") + .setIpAddress(ipAddress) + .addPort(containerPort) + .addPort(ratisPort) + .addPort(restPort); + return builder.build(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index b339fb7ce9..9ac99303bd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -40,6 +40,7 @@ .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; @@ -703,6 +704,9 @@ long getLastHBProcessedCount() { public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { return VersionResponse.newBuilder() .setVersion(this.version.getVersion()) + .addValue(OzoneConsts.SCM_ID, this.scmManager.getScmStorage().getScmId()) + .addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage() + .getClusterID()) .build(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 34779daf94..9db9e802ec 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.VersionInfo; @@ -125,12 +126,14 @@ public void testGetVersion() throws Exception { * how the state machine would make the call. */ public void testGetVersionTask() throws Exception { - Configuration conf = SCMTestUtils.getConf(); + OzoneConfiguration conf = SCMTestUtils.getConf(); try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, serverAddress, 1000)) { + OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(), + conf); rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, - conf); + conf, ozoneContainer); EndpointStateMachine.EndPointStates newState = versionTask.call(); // if version call worked the endpoint should automatically move to the @@ -149,14 +152,16 @@ public void testGetVersionTask() throws Exception { * expect that versionTask should be able to handle it. */ public void testGetVersionToInvalidEndpoint() throws Exception { - Configuration conf = SCMTestUtils.getConf(); + OzoneConfiguration conf = SCMTestUtils.getConf(); InetSocketAddress nonExistentServerAddress = SCMTestUtils .getReuseableAddress(); try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, nonExistentServerAddress, 1000)) { rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); - VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(), conf); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf, ozoneContainer); EndpointStateMachine.EndPointStates newState = versionTask.call(); // This version call did NOT work, so endpoint should remain in the same @@ -175,13 +180,15 @@ public void testGetVersionToInvalidEndpoint() throws Exception { public void testGetVersionAssertRpcTimeOut() throws Exception { final long rpcTimeout = 1000; final long tolerance = 100; - Configuration conf = SCMTestUtils.getConf(); + OzoneConfiguration conf = SCMTestUtils.getConf(); try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, serverAddress, (int) rpcTimeout)) { rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); - VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(), conf); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf, ozoneContainer); scmServerImpl.setRpcResponseDelay(1500); long start = Time.monotonicNow(); @@ -386,4 +393,5 @@ private ContainerReportsProto createContainerReport( } return reportsBuilder.build(); } + } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java index c937980f51..ad1e706bf7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java @@ -27,8 +27,10 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; @@ -163,8 +165,9 @@ private MetadataStore getContainerMetadata(Long containerID) DatanodeDetails leadDN = container.getPipeline().getLeader(); OzoneContainer containerServer = getContainerServerByDatanodeUuid(leadDN.getUuidString()); - ContainerData containerData = containerServer.getContainerManager() - .readContainer(containerID); + KeyValueContainerData containerData = (KeyValueContainerData) containerServer + .getContainerSet() + .getContainer(containerID).getContainerData(); return KeyUtils.getDB(containerData, conf); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java index 9e8cb468bb..b832dd2556 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -32,7 +32,7 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; @@ -183,7 +183,7 @@ public void testCloseContainerViaRatis() throws IOException, for (DatanodeDetails datanodeDetails : datanodes) { GenericTestUtils.waitFor( () -> isContainerClosed(cluster, containerID, datanodeDetails), 500, - 5 * 1000); + 15 * 1000); //double check if it's really closed (waitFor also throws an exception) Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails)); } @@ -204,7 +204,7 @@ private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID, if (datanode.equals(datanodeService.getDatanodeDetails())) { containerData = datanodeService.getDatanodeStateMachine().getContainer() - .getContainerManager().readContainer(containerID); + .getContainerSet().getContainer(containerID).getContainerData(); if (!containerData.isOpen()) { // make sure the closeContainerHandler on the Datanode is invoked Assert.assertTrue( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java index efb734497f..114bd04820 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.rest.OzoneException; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; @@ -104,8 +104,8 @@ private Boolean isContainerClosed(MiniOzoneCluster cluster, ContainerData containerData; try { containerData = cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().getContainer().getContainerManager() - .readContainer(containerID); + .getDatanodeStateMachine().getContainer().getContainerSet() + .getContainer(containerID).getContainerData(); return !containerData.isOpen(); } catch (StorageContainerException e) { throw new AssertionError(e); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index 3f0203659a..18b325ba8b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -35,10 +35,7 @@ import org.junit.Test; import org.junit.rules.Timeout; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; /** @@ -66,7 +63,11 @@ public void testCreateOzoneContainer() throws Exception { conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getLeader() .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()); conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false); - container = new OzoneContainer(TestUtils.getDatanodeDetails(), conf); + + container = new OzoneContainer(TestUtils.getDatanodeDetails(), + conf); + //Setting scmId, as we start manually ozone container. + container.getDispatcher().setScmId(UUID.randomUUID().toString()); container.start(); XceiverClient client = new XceiverClient(pipeline, conf); @@ -392,7 +393,7 @@ public void testDeleteContainer() throws Exception { response = client.sendCommand(request); Assert.assertNotNull(response); - Assert.assertEquals(ContainerProtos.Result.UNCLOSED_CONTAINER_IO, + Assert.assertEquals(ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER, response.getResult()); Assert.assertTrue(request.getTraceID().equals(response.getTraceID())); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index d4c572f116..bd9259ddce 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.server; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -262,5 +263,14 @@ public void init() { @Override public void shutdown() { } + @Override + public Handler getHandler(ContainerProtos.ContainerType containerType) { + return null; + } + + @Override + public void setScmId(String scmId) { + + } } } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java index bafba32008..adce0efc84 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestContainerReportWithKeys.java @@ -27,8 +27,8 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.*; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; @@ -119,8 +119,8 @@ public void testContainerReportKeyWrite() throws Exception { ContainerData cd = getContainerData(keyInfo.getContainerID()); - LOG.info("DN Container Data: keyCount: {} used: {} ", - cd.getKeyCount(), cd.getBytesUsed()); +/* LOG.info("DN Container Data: keyCount: {} used: {} ", + cd.getKeyCount(), cd.getBytesUsed());*/ ContainerInfo cinfo = scm.getContainerInfo(keyInfo.getContainerID()); @@ -132,9 +132,9 @@ public void testContainerReportKeyWrite() throws Exception { private static ContainerData getContainerData(long containerID) { ContainerData containerData; try { - ContainerManager containerManager = cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().getContainer().getContainerManager(); - containerData = containerManager.readContainer(containerID); + ContainerSet containerManager = cluster.getHddsDatanodes().get(0) + .getDatanodeStateMachine().getContainer().getContainerSet(); + containerData = containerManager.getContainer(containerID).getContainerData(); } catch (StorageContainerException e) { throw new AssertionError(e); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java index b86c57721c..cda54cbdef 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -44,9 +44,10 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.rpc.RpcClient; -import org.apache.hadoop.ozone.container.common.helpers.ContainerData; -import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.ksm.KeySpaceManager; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; @@ -698,13 +699,16 @@ public void testDeleteKey() throws Exception { List locations = keyInfo.getLatestVersionLocations().getLocationList(); for (KsmKeyLocationInfo location : locations) { - KeyData keyData = new KeyData(location.getBlockID()); - KeyData blockInfo = cm.getContainerManager() - .getKeyManager().getKey(keyData); - ContainerData containerData = cm.getContainerManager() - .readContainer(keyData.getContainerID()); - File dataDir = ContainerUtils - .getDataDirectory(containerData).toFile(); + KeyValueHandler keyValueHandler = (KeyValueHandler) cm + .getDispatcher().getHandler(ContainerProtos.ContainerType + .KeyValueContainer); + KeyValueContainer container = (KeyValueContainer) cm.getContainerSet() + .getContainer(location.getBlockID().getContainerID()); + KeyData blockInfo = keyValueHandler + .getKeyManager().getKey(container, location.getBlockID()); + KeyValueContainerData containerData = (KeyValueContainerData) container + .getContainerData(); + File dataDir = new File(containerData.getChunksPath()); for (ContainerProtos.ChunkInfo chunkInfo : blockInfo.getChunks()) { File chunkFile = dataDir.toPath() .resolve(chunkInfo.getChunkName()).toFile();