HDDS-183:Integrate Volumeset, ContainerSet and HddsDispatcher. Contributed by Bharat Viswanadham
This commit is contained in:
parent
13579f9296
commit
52d1d9603e
@ -137,6 +137,7 @@ enum Result {
|
||||
CONTAINER_METADATA_ERROR = 31;
|
||||
CONTAINER_FILES_CREATE_ERROR = 32;
|
||||
CONTAINER_CHECKSUM_ERROR = 33;
|
||||
UNKNOWN_CONTAINER_TYPE = 34;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||
ContainerType;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||
|
@ -16,11 +16,12 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.keyvalue;
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.yaml.snakeyaml.Yaml;
|
||||
|
||||
import java.beans.IntrospectionException;
|
||||
@ -46,13 +47,16 @@
|
||||
import org.yaml.snakeyaml.nodes.Tag;
|
||||
import org.yaml.snakeyaml.representer.Representer;
|
||||
|
||||
import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.YAML_FIELDS;
|
||||
import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.YAML_TAG;
|
||||
|
||||
/**
|
||||
* Class for creating and reading .container files.
|
||||
*/
|
||||
|
||||
public final class KeyValueYaml {
|
||||
public final class ContainerDataYaml {
|
||||
|
||||
private KeyValueYaml() {
|
||||
private ContainerDataYaml() {
|
||||
|
||||
}
|
||||
/**
|
||||
@ -62,29 +66,39 @@ private KeyValueYaml() {
|
||||
* @param containerData
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void createContainerFile(File containerFile, ContainerData
|
||||
containerData) throws IOException {
|
||||
public static void createContainerFile(ContainerProtos.ContainerType
|
||||
containerType, File containerFile,
|
||||
ContainerData containerData) throws
|
||||
IOException {
|
||||
|
||||
Preconditions.checkNotNull(containerFile, "yamlFile cannot be null");
|
||||
Preconditions.checkNotNull(containerData, "containerData cannot be null");
|
||||
Preconditions.checkNotNull(containerType, "containerType cannot be null");
|
||||
|
||||
PropertyUtils propertyUtils = new PropertyUtils();
|
||||
propertyUtils.setBeanAccess(BeanAccess.FIELD);
|
||||
propertyUtils.setAllowReadOnlyProperties(true);
|
||||
|
||||
Representer representer = new KeyValueContainerDataRepresenter();
|
||||
switch(containerType) {
|
||||
case KeyValueContainer:
|
||||
Representer representer = new ContainerDataRepresenter();
|
||||
representer.setPropertyUtils(propertyUtils);
|
||||
representer.addClassTag(
|
||||
KeyValueContainerData.class, new Tag("KeyValueContainerData"));
|
||||
representer.addClassTag(KeyValueContainerData.class,
|
||||
KeyValueContainerData.YAML_TAG);
|
||||
|
||||
Constructor keyValueDataConstructor = new KeyValueDataConstructor();
|
||||
Constructor keyValueDataConstructor = new ContainerDataConstructor();
|
||||
|
||||
Yaml yaml = new Yaml(keyValueDataConstructor, representer);
|
||||
|
||||
Writer writer = new OutputStreamWriter(new FileOutputStream(containerFile),
|
||||
"UTF-8");
|
||||
Writer writer = new OutputStreamWriter(new FileOutputStream(
|
||||
containerFile), "UTF-8");
|
||||
yaml.dump(containerData, writer);
|
||||
writer.close();
|
||||
break;
|
||||
default:
|
||||
throw new StorageContainerException("Unrecognized container Type " +
|
||||
"format " + containerType, ContainerProtos.Result
|
||||
.UNKNOWN_CONTAINER_TYPE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -93,57 +107,53 @@ public static void createContainerFile(File containerFile, ContainerData
|
||||
* @param containerFile
|
||||
* @throws IOException
|
||||
*/
|
||||
public static KeyValueContainerData readContainerFile(File containerFile)
|
||||
public static ContainerData readContainerFile(File containerFile)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
|
||||
|
||||
InputStream input = null;
|
||||
KeyValueContainerData keyValueContainerData;
|
||||
ContainerData containerData;
|
||||
try {
|
||||
PropertyUtils propertyUtils = new PropertyUtils();
|
||||
propertyUtils.setBeanAccess(BeanAccess.FIELD);
|
||||
propertyUtils.setAllowReadOnlyProperties(true);
|
||||
|
||||
Representer representer = new KeyValueContainerDataRepresenter();
|
||||
Representer representer = new ContainerDataRepresenter();
|
||||
representer.setPropertyUtils(propertyUtils);
|
||||
representer.addClassTag(
|
||||
KeyValueContainerData.class, new Tag("KeyValueContainerData"));
|
||||
|
||||
Constructor keyValueDataConstructor = new KeyValueDataConstructor();
|
||||
Constructor containerDataConstructor = new ContainerDataConstructor();
|
||||
|
||||
Yaml yaml = new Yaml(keyValueDataConstructor, representer);
|
||||
Yaml yaml = new Yaml(containerDataConstructor, representer);
|
||||
yaml.setBeanAccess(BeanAccess.FIELD);
|
||||
|
||||
input = new FileInputStream(containerFile);
|
||||
keyValueContainerData = (KeyValueContainerData)
|
||||
containerData = (ContainerData)
|
||||
yaml.load(input);
|
||||
} finally {
|
||||
if (input!= null) {
|
||||
input.close();
|
||||
}
|
||||
}
|
||||
return keyValueContainerData;
|
||||
return containerData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Representer class to define which fields need to be stored in yaml file.
|
||||
*/
|
||||
private static class KeyValueContainerDataRepresenter extends Representer {
|
||||
private static class ContainerDataRepresenter extends Representer {
|
||||
@Override
|
||||
protected Set<Property> getProperties(Class<? extends Object> type)
|
||||
throws IntrospectionException {
|
||||
Set<Property> set = super.getProperties(type);
|
||||
Set<Property> filtered = new TreeSet<Property>();
|
||||
|
||||
// When a new Container type is added, we need to add what fields need
|
||||
// to be filtered here
|
||||
if (type.equals(KeyValueContainerData.class)) {
|
||||
// filter properties
|
||||
for (Property prop : set) {
|
||||
String name = prop.getName();
|
||||
// When a new field needs to be added, it needs to be added here.
|
||||
if (name.equals("containerType") || name.equals("containerId") ||
|
||||
name.equals("layOutVersion") || name.equals("state") ||
|
||||
name.equals("metadata") || name.equals("metadataPath") ||
|
||||
name.equals("chunksPath") || name.equals(
|
||||
"containerDBType")) {
|
||||
if (YAML_FIELDS.contains(name)) {
|
||||
filtered.add(prop);
|
||||
}
|
||||
}
|
||||
@ -155,11 +165,12 @@ protected Set<Property> getProperties(Class<? extends Object> type)
|
||||
/**
|
||||
* Constructor class for KeyValueData, which will be used by Yaml.
|
||||
*/
|
||||
private static class KeyValueDataConstructor extends Constructor {
|
||||
KeyValueDataConstructor() {
|
||||
private static class ContainerDataConstructor extends Constructor {
|
||||
ContainerDataConstructor() {
|
||||
//Adding our own specific constructors for tags.
|
||||
this.yamlConstructors.put(new Tag("KeyValueContainerData"),
|
||||
new ConstructKeyValueContainerData());
|
||||
// When a new Container type is added, we need to add yamlConstructor
|
||||
// for that
|
||||
this.yamlConstructors.put(YAML_TAG, new ConstructKeyValueContainerData());
|
||||
this.yamlConstructors.put(Tag.INT, new ConstructLong());
|
||||
}
|
||||
|
||||
@ -167,21 +178,14 @@ private class ConstructKeyValueContainerData extends AbstractConstruct {
|
||||
public Object construct(Node node) {
|
||||
MappingNode mnode = (MappingNode) node;
|
||||
Map<Object, Object> nodes = constructMapping(mnode);
|
||||
String type = (String) nodes.get("containerType");
|
||||
|
||||
ContainerProtos.ContainerType containerType = ContainerProtos
|
||||
.ContainerType.KeyValueContainer;
|
||||
if (type.equals("KeyValueContainer")) {
|
||||
containerType = ContainerProtos.ContainerType.KeyValueContainer;
|
||||
}
|
||||
|
||||
//Needed this, as TAG.INT type is by default converted to Long.
|
||||
long layOutVersion = (long) nodes.get("layOutVersion");
|
||||
int lv = (int) layOutVersion;
|
||||
|
||||
//When a new field is added, it needs to be added here.
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(containerType,
|
||||
(long) nodes.get("containerId"), lv);
|
||||
KeyValueContainerData kvData = new KeyValueContainerData((long) nodes
|
||||
.get("containerId"), lv);
|
||||
kvData.setContainerDBType((String)nodes.get("containerDBType"));
|
||||
kvData.setMetadataPath((String) nodes.get(
|
||||
"metadataPath"));
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
|
||||
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||
@ -90,6 +91,16 @@ public void init() {
|
||||
public void shutdown() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Handler getHandler(ContainerProtos.ContainerType containerType) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScmId(String scmId) {
|
||||
// DO nothing, this will be removed when cleanup.
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerCommandResponseProto dispatch(
|
||||
ContainerCommandRequestProto msg) {
|
||||
|
@ -52,24 +52,23 @@ public class HddsDispatcher implements ContainerDispatcher {
|
||||
private final Configuration conf;
|
||||
private final ContainerSet containerSet;
|
||||
private final VolumeSet volumeSet;
|
||||
private final String scmID;
|
||||
private String scmID;
|
||||
|
||||
/**
|
||||
* Constructs an OzoneContainer that receives calls from
|
||||
* XceiverServerHandler.
|
||||
*/
|
||||
public HddsDispatcher(Configuration config, ContainerSet contSet,
|
||||
VolumeSet volumes, String scmId) {
|
||||
// TODO: Pass ContainerSet, VolumeSet and scmID, intialize metrics
|
||||
VolumeSet volumes) {
|
||||
//TODO: initialize metrics
|
||||
this.conf = config;
|
||||
this.containerSet = contSet;
|
||||
this.volumeSet = volumes;
|
||||
this.scmID = scmId;
|
||||
this.handlers = Maps.newHashMap();
|
||||
for (ContainerType containerType : ContainerType.values()) {
|
||||
handlers.put(containerType,
|
||||
Handler.getHandlerForContainerType(
|
||||
containerType, conf, containerSet, volumeSet, scmID));
|
||||
containerType, conf, containerSet, volumeSet));
|
||||
}
|
||||
}
|
||||
|
||||
@ -103,7 +102,7 @@ public ContainerCommandResponseProto dispatch(
|
||||
return ContainerUtils.logAndReturnError(LOG, ex, msg);
|
||||
}
|
||||
|
||||
Handler handler = getHandlerForContainerType(containerType);
|
||||
Handler handler = getHandler(containerType);
|
||||
if (handler == null) {
|
||||
StorageContainerException ex = new StorageContainerException("Invalid " +
|
||||
"ContainerType " + containerType,
|
||||
@ -113,9 +112,20 @@ public ContainerCommandResponseProto dispatch(
|
||||
return handler.handle(msg, container);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Handler getHandlerForContainerType(ContainerType type) {
|
||||
return handlers.get(type);
|
||||
@Override
|
||||
public Handler getHandler(ContainerProtos.ContainerType containerType) {
|
||||
return handlers.get(containerType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScmId(String scmId) {
|
||||
Preconditions.checkNotNull(scmId, "scmId Cannot be null");
|
||||
if (this.scmID == null) {
|
||||
this.scmID = scmId;
|
||||
for (Map.Entry<ContainerType, Handler> handlerMap : handlers.entrySet()) {
|
||||
handlerMap.getValue().setScmID(scmID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long getContainerID(ContainerCommandRequestProto request)
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerCommandRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
@ -48,4 +49,17 @@ public interface ContainerDispatcher {
|
||||
* Shutdown Dispatcher services.
|
||||
*/
|
||||
void shutdown();
|
||||
|
||||
/**
|
||||
* Returns the handler for the specified containerType.
|
||||
* @param containerType
|
||||
* @return
|
||||
*/
|
||||
Handler getHandler(ContainerProtos.ContainerType containerType);
|
||||
|
||||
/**
|
||||
* If scmId is not set, this will set scmId, otherwise it is a no-op.
|
||||
* @param scmId
|
||||
*/
|
||||
void setScmId(String scmId);
|
||||
}
|
||||
|
@ -31,7 +31,6 @@
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
|
||||
@ -42,22 +41,20 @@ public class Handler {
|
||||
protected final Configuration conf;
|
||||
protected final ContainerSet containerSet;
|
||||
protected final VolumeSet volumeSet;
|
||||
protected final String scmID;
|
||||
protected String scmID;
|
||||
|
||||
protected Handler(Configuration config, ContainerSet contSet,
|
||||
VolumeSet volumeSet, String scmID) {
|
||||
VolumeSet volumeSet) {
|
||||
conf = config;
|
||||
containerSet = contSet;
|
||||
this.volumeSet = volumeSet;
|
||||
this.scmID = scmID;
|
||||
}
|
||||
|
||||
public static Handler getHandlerForContainerType(ContainerType containerType,
|
||||
Configuration config, ContainerSet contSet, VolumeSet volumeSet,
|
||||
String scmID) {
|
||||
Configuration config, ContainerSet contSet, VolumeSet volumeSet) {
|
||||
switch (containerType) {
|
||||
case KeyValueContainer:
|
||||
return KeyValueHandler.getInstance(config, contSet, volumeSet, scmID);
|
||||
return KeyValueHandler.getInstance(config, contSet, volumeSet);
|
||||
default:
|
||||
throw new IllegalArgumentException("Handler for ContainerType: " +
|
||||
containerType + "doesn't exist.");
|
||||
@ -68,4 +65,8 @@ public ContainerCommandResponseProto handle(
|
||||
ContainerCommandRequestProto msg, Container container) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setScmID(String scmId) {
|
||||
this.scmID = scmId;
|
||||
}
|
||||
}
|
||||
|
@ -93,8 +93,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
|
||||
// trick.
|
||||
commandDispatcher = CommandDispatcher.newBuilder()
|
||||
.addHandler(new CloseContainerCommandHandler())
|
||||
.addHandler(new DeleteBlocksCommandHandler(
|
||||
container.getContainerManager(), conf))
|
||||
.addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
|
||||
conf))
|
||||
.setConnectionManager(connectionManager)
|
||||
.setContainer(container)
|
||||
.setContext(context)
|
||||
|
@ -97,6 +97,7 @@ public BlockDeletingService(ContainerManager containerManager,
|
||||
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public BackgroundTaskQueue getTasks() {
|
||||
BackgroundTaskQueue queue = new BackgroundTaskQueue();
|
||||
|
@ -18,8 +18,10 @@
|
||||
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
|
||||
@ -29,11 +31,13 @@
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers
|
||||
.DeletedContainerBlocksSummary;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.EndpointStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
@ -51,6 +55,8 @@
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
|
||||
|
||||
/**
|
||||
* Handle block deletion commands.
|
||||
*/
|
||||
@ -59,14 +65,14 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
|
||||
|
||||
private ContainerManager containerManager;
|
||||
private Configuration conf;
|
||||
private final ContainerSet containerSet;
|
||||
private final Configuration conf;
|
||||
private int invocationCount;
|
||||
private long totalTime;
|
||||
|
||||
public DeleteBlocksCommandHandler(ContainerManager containerManager,
|
||||
public DeleteBlocksCommandHandler(ContainerSet cset,
|
||||
Configuration conf) {
|
||||
this.containerManager = containerManager;
|
||||
this.containerSet = cset;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@ -105,8 +111,24 @@ public void handle(SCMCommand command, OzoneContainer container,
|
||||
DeleteBlockTransactionResult.newBuilder();
|
||||
txResultBuilder.setTxID(entry.getTxID());
|
||||
try {
|
||||
deleteContainerBlocks(entry, conf);
|
||||
long containerId = entry.getContainerID();
|
||||
Container cont = containerSet.getContainer(containerId);
|
||||
if(cont == null) {
|
||||
throw new StorageContainerException("Unable to find the container "
|
||||
+ containerId, CONTAINER_NOT_FOUND);
|
||||
}
|
||||
ContainerProtos.ContainerType containerType = cont.getContainerType();
|
||||
switch (containerType) {
|
||||
case KeyValueContainer:
|
||||
KeyValueContainerData containerData = (KeyValueContainerData)
|
||||
cont.getContainerData();
|
||||
deleteKeyValueContainerBlocks(containerData, entry);
|
||||
txResultBuilder.setSuccess(true);
|
||||
break;
|
||||
default:
|
||||
LOG.error("Delete Blocks Command Handler is not implemented for " +
|
||||
"containerType {}", containerType);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to delete blocks for container={}, TXID={}",
|
||||
entry.getContainerID(), entry.getTxID(), e);
|
||||
@ -145,21 +167,21 @@ public void handle(SCMCommand command, OzoneContainer container,
|
||||
* Move a bunch of blocks from a container to deleting state.
|
||||
* This is a meta update, the actual deletes happen in async mode.
|
||||
*
|
||||
* @param containerData - KeyValueContainerData
|
||||
* @param delTX a block deletion transaction.
|
||||
* @param config configuration.
|
||||
* @throws IOException if I/O error occurs.
|
||||
*/
|
||||
private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
|
||||
Configuration config) throws IOException {
|
||||
private void deleteKeyValueContainerBlocks(
|
||||
KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
|
||||
throws IOException {
|
||||
long containerId = delTX.getContainerID();
|
||||
ContainerData containerInfo = containerManager.readContainer(containerId);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing Container : {}, DB path : {}", containerId,
|
||||
containerInfo.getDBPath());
|
||||
containerData.getMetadataPath());
|
||||
}
|
||||
|
||||
int newDeletionBlocks = 0;
|
||||
MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
|
||||
MetadataStore containerDB = KeyUtils.getDB(containerData, conf);
|
||||
for (Long blk : delTX.getLocalIDList()) {
|
||||
BatchOperation batch = new BatchOperation();
|
||||
byte[] blkBytes = Longs.toByteArray(blk);
|
||||
@ -187,12 +209,12 @@ private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
|
||||
+ " container {}, skip deleting it.", blk, containerId);
|
||||
}
|
||||
containerDB.put(DFSUtil.string2Bytes(
|
||||
OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()),
|
||||
OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + containerId),
|
||||
Longs.toByteArray(delTX.getTxID()));
|
||||
}
|
||||
|
||||
// update pending deletion blocks count in in-memory container status
|
||||
containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId);
|
||||
containerData.incrPendingDeletionBlocks(newDeletionBlocks);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -95,7 +95,8 @@ public void execute(ExecutorService executor) {
|
||||
getEndPointTask(EndpointStateMachine endpoint) {
|
||||
switch (endpoint.getState()) {
|
||||
case GETVERSION:
|
||||
return new VersionEndpointTask(endpoint, conf);
|
||||
return new VersionEndpointTask(endpoint, conf, context.getParent()
|
||||
.getContainer());
|
||||
case REGISTER:
|
||||
return RegisterEndpointTask.newBuilder()
|
||||
.setConfig(conf)
|
||||
|
@ -16,14 +16,22 @@
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.states.endpoint;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.EndpointStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
@ -33,11 +41,13 @@ public class VersionEndpointTask implements
|
||||
Callable<EndpointStateMachine.EndPointStates> {
|
||||
private final EndpointStateMachine rpcEndPoint;
|
||||
private final Configuration configuration;
|
||||
private final OzoneContainer ozoneContainer;
|
||||
|
||||
public VersionEndpointTask(EndpointStateMachine rpcEndPoint,
|
||||
Configuration conf) {
|
||||
Configuration conf, OzoneContainer container) {
|
||||
this.rpcEndPoint = rpcEndPoint;
|
||||
this.configuration = conf;
|
||||
this.ozoneContainer = container;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -52,7 +62,27 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
|
||||
try{
|
||||
SCMVersionResponseProto versionResponse =
|
||||
rpcEndPoint.getEndPoint().getVersion(null);
|
||||
rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse));
|
||||
VersionResponse response = VersionResponse.getFromProtobuf(
|
||||
versionResponse);
|
||||
rpcEndPoint.setVersion(response);
|
||||
VolumeSet volumeSet = ozoneContainer.getVolumeSet();
|
||||
Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
|
||||
List<HddsProtos.KeyValue> keyValues = versionResponse.getKeysList();
|
||||
|
||||
String scmId = response.getValue(OzoneConsts.SCM_ID);
|
||||
String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
|
||||
|
||||
Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
|
||||
"null");
|
||||
Preconditions.checkNotNull(scmId, "Reply from SCM: clusterId cannot be" +
|
||||
" null");
|
||||
|
||||
// If version file does not exist create version file and also set scmId
|
||||
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
||||
HddsVolume hddsVolume = entry.getValue();
|
||||
hddsVolume.format(clusterId);
|
||||
ozoneContainer.getDispatcher().setScmId(scmId);
|
||||
}
|
||||
|
||||
EndpointStateMachine.EndPointStates nextState =
|
||||
rpcEndPoint.getState().getNextState();
|
||||
|
@ -130,6 +130,10 @@ private HddsVolume(Builder b) throws IOException {
|
||||
initialize();
|
||||
}
|
||||
|
||||
public VolumeInfo getVolumeInfo() {
|
||||
return volumeInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the volume.
|
||||
* Creates the Version file if not present,
|
||||
@ -327,4 +331,6 @@ public enum VolumeState {
|
||||
public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) {
|
||||
volumeInfo.setScmUsageForTesting(scmUsageForTest);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -27,8 +27,13 @@
|
||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
|
||||
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
|
||||
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
|
||||
@ -309,4 +314,47 @@ public Map<String, HddsVolume> getVolumeMap() {
|
||||
public Map<StorageType, List<HddsVolume>> getVolumeStateMap() {
|
||||
return ImmutableMap.copyOf(volumeStateMap);
|
||||
}
|
||||
|
||||
public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport()
|
||||
throws IOException {
|
||||
boolean failed;
|
||||
StorageLocationReport[] reports =
|
||||
new StorageLocationReport[volumeMap.size()];
|
||||
int counter = 0;
|
||||
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
||||
HddsVolume hddsVolume = entry.getValue();
|
||||
VolumeInfo volumeInfo = hddsVolume.getVolumeInfo();
|
||||
long scmUsed = 0;
|
||||
long remaining = 0;
|
||||
failed = false;
|
||||
try {
|
||||
scmUsed = volumeInfo.getScmUsed();
|
||||
remaining = volumeInfo.getAvailable();
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Failed to get scmUsed and remaining for container " +
|
||||
"storage location {}", volumeInfo.getRootDir());
|
||||
// reset scmUsed and remaining if df/du failed.
|
||||
scmUsed = 0;
|
||||
remaining = 0;
|
||||
failed = true;
|
||||
}
|
||||
|
||||
StorageLocationReport.Builder builder =
|
||||
StorageLocationReport.newBuilder();
|
||||
builder.setStorageLocation(volumeInfo.getRootDir())
|
||||
.setId(hddsVolume.getStorageID())
|
||||
.setFailed(failed)
|
||||
.setCapacity(hddsVolume.getCapacity())
|
||||
.setRemaining(remaining)
|
||||
.setScmUsed(scmUsed)
|
||||
.setStorageType(hddsVolume.getStorageType());
|
||||
StorageLocationReport r = builder.build();
|
||||
reports[counter++] = r;
|
||||
}
|
||||
NodeReportProto.Builder nrb = NodeReportProto.newBuilder();
|
||||
for (int i = 0; i < reports.length; i++) {
|
||||
nrb.addStorageReport(reports[i].getProtoBufMessage());
|
||||
}
|
||||
return nrb.build();
|
||||
}
|
||||
}
|
@ -19,7 +19,6 @@
|
||||
package org.apache.hadoop.ozone.container.keyvalue;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
@ -33,6 +32,7 @@
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||
@ -47,21 +47,16 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.Result.CONTAINER_ALREADY_EXISTS;
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.Result.CONTAINER_CHECKSUM_ERROR;
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.Result.CONTAINER_METADATA_ERROR;
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
@ -74,8 +69,6 @@
|
||||
.Result.ERROR_IN_COMPACT_DB;
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.Result.INVALID_CONTAINER_STATE;
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.Result.NO_SUCH_ALGORITHM;
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.Result.UNSUPPORTED_REQUEST;
|
||||
|
||||
@ -198,10 +191,12 @@ private void createContainerFile(File containerFile, File
|
||||
try {
|
||||
tempContainerFile = createTempFile(containerFile);
|
||||
tempCheckSumFile = createTempFile(containerCheckSumFile);
|
||||
KeyValueYaml.createContainerFile(tempContainerFile, containerData);
|
||||
ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
|
||||
.KeyValueContainer, tempContainerFile, containerData);
|
||||
|
||||
//Compute Checksum for container file
|
||||
String checksum = computeCheckSum(tempContainerFile);
|
||||
String checksum = KeyValueContainerUtil.computeCheckSum(containerId,
|
||||
tempContainerFile);
|
||||
containerCheckSumStream = new FileOutputStream(tempCheckSumFile);
|
||||
writer = new OutputStreamWriter(containerCheckSumStream, "UTF-8");
|
||||
writer.write(checksum);
|
||||
@ -308,43 +303,6 @@ private void updateContainerFile(File containerFile, File
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Compute checksum of the .container file.
|
||||
* @param containerFile
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
private String computeCheckSum(File containerFile) throws
|
||||
StorageContainerException {
|
||||
|
||||
MessageDigest sha;
|
||||
FileInputStream containerFileStream = null;
|
||||
try {
|
||||
sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new StorageContainerException("Unable to create Message Digest,"
|
||||
+ " usually this is a java configuration issue.",
|
||||
NO_SUCH_ALGORITHM);
|
||||
}
|
||||
|
||||
try {
|
||||
containerFileStream = new FileInputStream(containerFile);
|
||||
byte[] byteArray = new byte[1024];
|
||||
int bytesCount = 0;
|
||||
|
||||
while ((bytesCount = containerFileStream.read(byteArray)) != -1) {
|
||||
sha.update(byteArray, 0, bytesCount);
|
||||
}
|
||||
String checksum = DigestUtils.sha256Hex(sha.digest());
|
||||
return checksum;
|
||||
} catch (IOException ex) {
|
||||
throw new StorageContainerException("Error during update of " +
|
||||
"check sum file. Container Name: " + containerData.getContainerId(),
|
||||
ex, CONTAINER_CHECKSUM_ERROR);
|
||||
} finally {
|
||||
IOUtils.closeStream(containerFileStream);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(boolean forceDelete)
|
||||
throws StorageContainerException {
|
||||
|
@ -18,12 +18,14 @@
|
||||
|
||||
package org.apache.hadoop.ozone.container.keyvalue;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.yaml.snakeyaml.nodes.Tag;
|
||||
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -33,6 +35,14 @@
|
||||
*/
|
||||
public class KeyValueContainerData extends ContainerData {
|
||||
|
||||
// Yaml Tag used for KeyValueContainerData.
|
||||
public static final Tag YAML_TAG = new Tag("KeyValueContainerData");
|
||||
|
||||
// Fields need to be stored in .container file.
|
||||
public static final List<String> YAML_FIELDS = Lists.newArrayList(
|
||||
"containerType", "containerId", "layOutVersion", "state", "metadata",
|
||||
"metadataPath", "chunksPath", "containerDBType");
|
||||
|
||||
// Path to Container metadata Level DB/RocksDB Store and .container file.
|
||||
private String metadataPath;
|
||||
|
||||
@ -49,23 +59,21 @@ public class KeyValueContainerData extends ContainerData {
|
||||
|
||||
/**
|
||||
* Constructs KeyValueContainerData object.
|
||||
* @param type - containerType
|
||||
* @param id - ContainerId
|
||||
*/
|
||||
public KeyValueContainerData(ContainerProtos.ContainerType type, long id) {
|
||||
super(type, id);
|
||||
public KeyValueContainerData(long id) {
|
||||
super(ContainerProtos.ContainerType.KeyValueContainer, id);
|
||||
this.numPendingDeletionBlocks = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs KeyValueContainerData object.
|
||||
* @param type - containerType
|
||||
* @param id - ContainerId
|
||||
* @param layOutVersion
|
||||
*/
|
||||
public KeyValueContainerData(ContainerProtos.ContainerType type, long id,
|
||||
public KeyValueContainerData(long id,
|
||||
int layOutVersion) {
|
||||
super(type, id, layOutVersion);
|
||||
super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion);
|
||||
this.numPendingDeletionBlocks = 0;
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.ozone.container.keyvalue;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.sun.jersey.spi.resource.Singleton;
|
||||
@ -93,16 +94,16 @@ public class KeyValueHandler extends Handler {
|
||||
// TODO : Add metrics and populate it.
|
||||
|
||||
public static KeyValueHandler getInstance(Configuration config,
|
||||
ContainerSet contSet, VolumeSet volSet, String scmID) {
|
||||
ContainerSet contSet, VolumeSet volSet) {
|
||||
if (INSTANCE == null) {
|
||||
INSTANCE = new KeyValueHandler(config, contSet, volSet, scmID);
|
||||
INSTANCE = new KeyValueHandler(config, contSet, volSet);
|
||||
}
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
private KeyValueHandler(Configuration config, ContainerSet contSet,
|
||||
VolumeSet volSet, String scmID) {
|
||||
super(config, contSet, volSet, scmID);
|
||||
VolumeSet volSet) {
|
||||
super(config, contSet, volSet);
|
||||
containerType = ContainerType.KeyValueContainer;
|
||||
keyManager = new KeyManagerImpl(config);
|
||||
chunkManager = new ChunkManagerImpl();
|
||||
@ -156,6 +157,16 @@ public ContainerCommandResponseProto handle(
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ChunkManager getChunkManager() {
|
||||
return this.chunkManager;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public KeyManager getKeyManager() {
|
||||
return this.keyManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles Create Container Request. If successful, adds the container to
|
||||
* ContainerSet.
|
||||
@ -180,7 +191,7 @@ ContainerCommandResponseProto handleCreateContainer(
|
||||
}
|
||||
|
||||
KeyValueContainerData newContainerData = new KeyValueContainerData(
|
||||
containerType, containerID);
|
||||
containerID);
|
||||
// TODO: Add support to add metadataList to ContainerData. Add metadata
|
||||
// to container during creation.
|
||||
KeyValueContainer newContainer = new KeyValueContainer(
|
||||
@ -262,7 +273,6 @@ ContainerCommandResponseProto handleDeleteContainer(
|
||||
|
||||
boolean forceDelete = request.getDeleteContainer().getForceDelete();
|
||||
kvContainer.writeLock();
|
||||
|
||||
try {
|
||||
// Check if container is open
|
||||
if (kvContainer.getContainerData().isOpen()) {
|
||||
|
@ -18,9 +18,11 @@
|
||||
package org.apache.hadoop.ozone.container.keyvalue.helpers;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||
.ContainerCommandRequestProto;
|
||||
@ -28,15 +30,29 @@
|
||||
.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||
.StorageContainerException;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.utils.MetadataKeyFilters;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.apache.hadoop.utils.MetadataStoreBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
|
||||
|
||||
/**
|
||||
* Class which defines utility methods for KeyValueContainer.
|
||||
@ -170,4 +186,122 @@ public static ContainerCommandResponseProto getReadContainerResponse(
|
||||
builder.setReadContainer(response);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute checksum of the .container file.
|
||||
* @param containerId
|
||||
* @param containerFile
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
public static String computeCheckSum(long containerId, File
|
||||
containerFile) throws StorageContainerException {
|
||||
Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
|
||||
MessageDigest sha;
|
||||
FileInputStream containerFileStream = null;
|
||||
try {
|
||||
sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new StorageContainerException("Unable to create Message Digest, " +
|
||||
"usually this is a java configuration issue.", NO_SUCH_ALGORITHM);
|
||||
}
|
||||
try {
|
||||
containerFileStream = new FileInputStream(containerFile);
|
||||
byte[] byteArray = new byte[1024];
|
||||
int bytesCount = 0;
|
||||
while ((bytesCount = containerFileStream.read(byteArray)) != -1) {
|
||||
sha.update(byteArray, 0, bytesCount);
|
||||
}
|
||||
String checksum = DigestUtils.sha256Hex(sha.digest());
|
||||
return checksum;
|
||||
} catch (IOException ex) {
|
||||
throw new StorageContainerException("Error during computing checksum: " +
|
||||
"for container " + containerId, ex, CONTAINER_CHECKSUM_ERROR);
|
||||
} finally {
|
||||
IOUtils.closeStream(containerFileStream);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify checksum of the container.
|
||||
* @param containerId
|
||||
* @param checksumFile
|
||||
* @param checksum
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
public static void verifyCheckSum(long containerId, File checksumFile,
|
||||
String checksum)
|
||||
throws StorageContainerException {
|
||||
try {
|
||||
Preconditions.checkNotNull(checksum);
|
||||
Preconditions.checkNotNull(checksumFile);
|
||||
Path path = Paths.get(checksumFile.getAbsolutePath());
|
||||
List<String> fileCheckSum = Files.readAllLines(path);
|
||||
Preconditions.checkState(fileCheckSum.size() == 1, "checksum " +
|
||||
"should be 32 byte string");
|
||||
if (!checksum.equals(fileCheckSum.get(0))) {
|
||||
LOG.error("Checksum mismatch for the container {}", containerId);
|
||||
throw new StorageContainerException("Checksum mismatch for " +
|
||||
"the container " + containerId, CHECKSUM_MISMATCH);
|
||||
}
|
||||
} catch (StorageContainerException ex) {
|
||||
throw ex;
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error during verify checksum for container {}", containerId);
|
||||
throw new StorageContainerException("Error during verify checksum" +
|
||||
" for container " + containerId, IO_EXCEPTION);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse KeyValueContainerData and verify checksum.
|
||||
* @param containerData
|
||||
* @param containerFile
|
||||
* @param checksumFile
|
||||
* @param dbFile
|
||||
* @param config
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void parseKeyValueContainerData(
|
||||
KeyValueContainerData containerData, File containerFile, File
|
||||
checksumFile, File dbFile, OzoneConfiguration config) throws IOException {
|
||||
|
||||
Preconditions.checkNotNull(containerData, "containerData cannot be null");
|
||||
Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
|
||||
Preconditions.checkNotNull(checksumFile, "checksumFile cannot be null");
|
||||
Preconditions.checkNotNull(dbFile, "dbFile cannot be null");
|
||||
Preconditions.checkNotNull(config, "ozone config cannot be null");
|
||||
|
||||
long containerId = containerData.getContainerId();
|
||||
String containerName = String.valueOf(containerId);
|
||||
File metadataPath = new File(containerData.getMetadataPath());
|
||||
|
||||
Preconditions.checkNotNull(containerName, "container Name cannot be " +
|
||||
"null");
|
||||
Preconditions.checkNotNull(metadataPath, "metadata path cannot be " +
|
||||
"null");
|
||||
|
||||
// Verify Checksum
|
||||
String checksum = KeyValueContainerUtil.computeCheckSum(
|
||||
containerData.getContainerId(), containerFile);
|
||||
KeyValueContainerUtil.verifyCheckSum(containerId, checksumFile, checksum);
|
||||
|
||||
containerData.setDbFile(dbFile);
|
||||
|
||||
MetadataStore metadata = KeyUtils.getDB(containerData, config);
|
||||
long bytesUsed = 0;
|
||||
List<Map.Entry<byte[], byte[]>> liveKeys = metadata
|
||||
.getRangeKVs(null, Integer.MAX_VALUE,
|
||||
MetadataKeyFilters.getNormalKeyFilter());
|
||||
bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
|
||||
KeyData keyData;
|
||||
try {
|
||||
keyData = KeyUtils.getKeyData(e.getValue());
|
||||
return keyData.getSize();
|
||||
} catch (IOException ex) {
|
||||
return 0L;
|
||||
}
|
||||
}).sum();
|
||||
containerData.setBytesUsed(bytesUsed);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,157 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.common.Storage;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
|
||||
|
||||
/**
|
||||
* Class used to read .container files from Volume and build container map.
|
||||
*/
|
||||
public class ContainerReader implements Runnable {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
ContainerReader.class);
|
||||
private File hddsVolumeDir;
|
||||
private final ContainerSet containerSet;
|
||||
private final OzoneConfiguration config;
|
||||
|
||||
ContainerReader(File volumeRoot, ContainerSet cset, OzoneConfiguration conf) {
|
||||
Preconditions.checkNotNull(volumeRoot);
|
||||
this.hddsVolumeDir = volumeRoot;
|
||||
this.containerSet = cset;
|
||||
this.config = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
readVolume(hddsVolumeDir);
|
||||
} catch (RuntimeException ex) {
|
||||
LOG.info("Caught an Run time exception during reading container files" +
|
||||
" from Volume {}", hddsVolumeDir);
|
||||
}
|
||||
}
|
||||
|
||||
public void readVolume(File hddsVolumeRootDir) {
|
||||
Preconditions.checkNotNull(hddsVolumeRootDir, "hddsVolumeRootDir" +
|
||||
"cannot be null");
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* layout of the container directory on the disk.
|
||||
* /hdds/<<scmUuid>>/current/<<containerdir>>/</containerID>/metadata
|
||||
* /<<containerID>>.container
|
||||
* /hdds/<<scmUuid>>/current/<<containerdir>>/<<containerID>>/metadata
|
||||
* /<<containerID>>.checksum
|
||||
* /hdds/<<scmUuid>>/current/<<containerdir>>/<<containerID>>/metadata
|
||||
* /<<containerID>>.db
|
||||
* /hdds/<<scmUuid>>/current/<<containerdir>>/<<containerID>>/chunks
|
||||
* /<<chunkFile>>
|
||||
*
|
||||
**/
|
||||
|
||||
//filtering scm directory
|
||||
File[] scmDir = hddsVolumeRootDir.listFiles(new FileFilter() {
|
||||
@Override
|
||||
public boolean accept(File pathname) {
|
||||
return pathname.isDirectory();
|
||||
}
|
||||
});
|
||||
|
||||
for (File scmLoc : scmDir) {
|
||||
File currentDir = null;
|
||||
currentDir = new File(scmLoc, Storage.STORAGE_DIR_CURRENT);
|
||||
File[] containerTopDirs = currentDir.listFiles();
|
||||
if (containerTopDirs != null) {
|
||||
for (File containerTopDir : containerTopDirs) {
|
||||
if (containerTopDir.isDirectory()) {
|
||||
File[] containerDirs = containerTopDir.listFiles();
|
||||
for (File containerDir : containerDirs) {
|
||||
File metadataPath = new File(containerDir + File.separator +
|
||||
OzoneConsts.CONTAINER_META_PATH);
|
||||
String containerName = containerDir.getName();
|
||||
if (metadataPath.exists()) {
|
||||
File containerFile = KeyValueContainerLocationUtil
|
||||
.getContainerFile(metadataPath, containerName);
|
||||
File checksumFile = KeyValueContainerLocationUtil
|
||||
.getContainerCheckSumFile(metadataPath, containerName);
|
||||
File dbFile = KeyValueContainerLocationUtil
|
||||
.getContainerDBFile(metadataPath, containerName);
|
||||
if (containerFile.exists() && checksumFile.exists() &&
|
||||
dbFile.exists()) {
|
||||
verifyContainerFile(containerFile, checksumFile, dbFile);
|
||||
} else {
|
||||
LOG.error("Missing container metadata files for Container: " +
|
||||
"{}", containerName);
|
||||
}
|
||||
} else {
|
||||
LOG.error("Missing container metadata directory for " +
|
||||
"Container: {}", containerName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyContainerFile(File containerFile, File checksumFile,
|
||||
File dbFile) {
|
||||
try {
|
||||
ContainerData containerData = ContainerDataYaml.readContainerFile(
|
||||
containerFile);
|
||||
|
||||
switch (containerData.getContainerType()) {
|
||||
case KeyValueContainer:
|
||||
KeyValueContainerData keyValueContainerData = (KeyValueContainerData)
|
||||
containerData;
|
||||
KeyValueContainerUtil.parseKeyValueContainerData(keyValueContainerData,
|
||||
containerFile, checksumFile, dbFile, config);
|
||||
KeyValueContainer keyValueContainer = new KeyValueContainer(
|
||||
keyValueContainerData, config);
|
||||
containerSet.addContainer(keyValueContainer);
|
||||
break;
|
||||
default:
|
||||
LOG.error("Unrecognized ContainerType {} format during verify " +
|
||||
"ContainerFile", containerData.getContainerType());
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error during reading container file {}", containerFile);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,72 +1,49 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.background
|
||||
.BlockDeletingService;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server
|
||||
.XceiverServerGrpc;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server
|
||||
.XceiverServerSpi;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.ratis
|
||||
.XceiverServerRatis;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.io.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
|
||||
|
||||
/**
|
||||
@ -74,69 +51,75 @@
|
||||
* layer.
|
||||
*/
|
||||
public class OzoneContainer {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(OzoneContainer.class);
|
||||
|
||||
private final Configuration ozoneConfig;
|
||||
private final ContainerDispatcher dispatcher;
|
||||
private final ContainerManager manager;
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
OzoneContainer.class);
|
||||
|
||||
private final HddsDispatcher hddsDispatcher;
|
||||
private final DatanodeDetails dnDetails;
|
||||
private final OzoneConfiguration config;
|
||||
private final VolumeSet volumeSet;
|
||||
private final ContainerSet containerSet;
|
||||
private final XceiverServerSpi[] server;
|
||||
private final ChunkManager chunkManager;
|
||||
private final KeyManager keyManager;
|
||||
private final BlockDeletingService blockDeletingService;
|
||||
|
||||
/**
|
||||
* Creates a network endpoint and enables Ozone container.
|
||||
*
|
||||
* @param ozoneConfig - Config
|
||||
* Construct OzoneContainer object.
|
||||
* @param datanodeDetails
|
||||
* @param conf
|
||||
* @throws DiskOutOfSpaceException
|
||||
* @throws IOException
|
||||
*/
|
||||
public OzoneContainer(
|
||||
DatanodeDetails datanodeDetails, Configuration ozoneConfig)
|
||||
throws IOException {
|
||||
this.ozoneConfig = ozoneConfig;
|
||||
List<StorageLocation> locations = new LinkedList<>();
|
||||
String[] paths = ozoneConfig.getStrings(
|
||||
OzoneConfigKeys.OZONE_METADATA_DIRS);
|
||||
if (paths != null && paths.length > 0) {
|
||||
for (String p : paths) {
|
||||
locations.add(StorageLocation.parse(
|
||||
Paths.get(p).resolve(CONTAINER_ROOT_PREFIX).toString()));
|
||||
}
|
||||
} else {
|
||||
getDataDir(locations);
|
||||
}
|
||||
|
||||
manager = new ContainerManagerImpl();
|
||||
manager.init(this.ozoneConfig, locations, datanodeDetails);
|
||||
this.chunkManager = new ChunkManagerImpl(manager);
|
||||
manager.setChunkManager(this.chunkManager);
|
||||
|
||||
this.keyManager = new KeyManagerImpl(manager, ozoneConfig);
|
||||
manager.setKeyManager(this.keyManager);
|
||||
|
||||
long svcInterval =
|
||||
ozoneConfig.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
|
||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
long serviceTimeout = ozoneConfig.getTimeDuration(
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
this.blockDeletingService = new BlockDeletingService(manager,
|
||||
svcInterval, serviceTimeout, ozoneConfig);
|
||||
|
||||
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
|
||||
|
||||
boolean useGrpc = this.ozoneConfig.getBoolean(
|
||||
public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
|
||||
conf) throws IOException {
|
||||
this.dnDetails = datanodeDetails;
|
||||
this.config = conf;
|
||||
this.volumeSet = new VolumeSet(datanodeDetails, conf);
|
||||
this.containerSet = new ContainerSet();
|
||||
boolean useGrpc = this.config.getBoolean(
|
||||
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
|
||||
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
|
||||
buildContainerSet();
|
||||
hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet);
|
||||
server = new XceiverServerSpi[]{
|
||||
useGrpc ? new XceiverServerGrpc(datanodeDetails,
|
||||
this.ozoneConfig, this.dispatcher) :
|
||||
useGrpc ? new XceiverServerGrpc(datanodeDetails, this.config, this
|
||||
.hddsDispatcher) :
|
||||
new XceiverServer(datanodeDetails,
|
||||
this.ozoneConfig, this.dispatcher),
|
||||
XceiverServerRatis
|
||||
.newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher)
|
||||
this.config, this.hddsDispatcher),
|
||||
XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
|
||||
.config, hddsDispatcher)
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Build's container map.
|
||||
*/
|
||||
public void buildContainerSet() {
|
||||
Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
|
||||
.iterator();
|
||||
ArrayList<Thread> volumeThreads = new ArrayList<Thread>();
|
||||
|
||||
//TODO: diskchecker should be run before this, to see how disks are.
|
||||
// And also handle disk failure tolerance need to be added
|
||||
while (volumeSetIterator.hasNext()) {
|
||||
HddsVolume volume = volumeSetIterator.next();
|
||||
File hddsVolumeRootDir = volume.getHddsRootDir();
|
||||
Thread thread = new Thread(new ContainerReader(hddsVolumeRootDir,
|
||||
containerSet, config));
|
||||
thread.start();
|
||||
volumeThreads.add(thread);
|
||||
}
|
||||
|
||||
try {
|
||||
for (int i = 0; i < volumeThreads.size(); i++) {
|
||||
volumeThreads.get(i).join();
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.info("Volume Threads Interrupted exception", ex);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -145,155 +128,46 @@ public OzoneContainer(
|
||||
* @throws IOException
|
||||
*/
|
||||
public void start() throws IOException {
|
||||
LOG.info("Attempting to start container services.");
|
||||
for (XceiverServerSpi serverinstance : server) {
|
||||
serverinstance.start();
|
||||
}
|
||||
blockDeletingService.start();
|
||||
dispatcher.init();
|
||||
hddsDispatcher.init();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the ozone container.
|
||||
* <p>
|
||||
* Shutdown logic is not very obvious from the following code. if you need to
|
||||
* modify the logic, please keep these comments in mind. Here is the shutdown
|
||||
* sequence.
|
||||
* <p>
|
||||
* 1. We shutdown the network ports.
|
||||
* <p>
|
||||
* 2. Now we need to wait for all requests in-flight to finish.
|
||||
* <p>
|
||||
* 3. The container manager lock is a read-write lock with "Fairness"
|
||||
* enabled.
|
||||
* <p>
|
||||
* 4. This means that the waiting threads are served in a "first-come-first
|
||||
* -served" manner. Please note that this applies to waiting threads only.
|
||||
* <p>
|
||||
* 5. Since write locks are exclusive, if we are waiting to get a lock it
|
||||
* implies that we are waiting for in-flight operations to complete.
|
||||
* <p>
|
||||
* 6. if there are other write operations waiting on the reader-writer lock,
|
||||
* fairness guarantees that they will proceed before the shutdown lock
|
||||
* request.
|
||||
* <p>
|
||||
* 7. Since all operations either take a reader or writer lock of container
|
||||
* manager, we are guaranteed that we are the last operation since we have
|
||||
* closed the network port, and we wait until close is successful.
|
||||
* <p>
|
||||
* 8. We take the writer lock and call shutdown on each of the managers in
|
||||
* reverse order. That is chunkManager, keyManager and containerManager is
|
||||
* shutdown.
|
||||
* Stop Container Service on the datanode.
|
||||
*/
|
||||
public void stop() {
|
||||
//TODO: at end of container IO integration work.
|
||||
LOG.info("Attempting to stop container services.");
|
||||
for(XceiverServerSpi serverinstance: server) {
|
||||
serverinstance.stop();
|
||||
}
|
||||
dispatcher.shutdown();
|
||||
|
||||
try {
|
||||
this.manager.writeLock();
|
||||
this.chunkManager.shutdown();
|
||||
this.keyManager.shutdown();
|
||||
this.manager.shutdown();
|
||||
this.blockDeletingService.shutdown();
|
||||
LOG.info("container services shutdown complete.");
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("container service shutdown error:", ex);
|
||||
} finally {
|
||||
this.manager.writeUnlock();
|
||||
}
|
||||
hddsDispatcher.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a paths to data dirs.
|
||||
*
|
||||
* @param pathList - List of paths.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void getDataDir(List<StorageLocation> pathList) throws IOException {
|
||||
for (String dir : ozoneConfig.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
|
||||
StorageLocation location = StorageLocation.parse(dir);
|
||||
pathList.add(location);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns node report of container storage usage.
|
||||
*/
|
||||
public NodeReportProto getNodeReport() throws IOException {
|
||||
return this.manager.getNodeReport();
|
||||
@VisibleForTesting
|
||||
public ContainerSet getContainerSet() {
|
||||
return containerSet;
|
||||
}
|
||||
|
||||
private int getPortbyType(HddsProtos.ReplicationType replicationType) {
|
||||
for (XceiverServerSpi serverinstance : server) {
|
||||
if (serverinstance.getServerType() == replicationType) {
|
||||
return serverinstance.getIPCPort();
|
||||
}
|
||||
}
|
||||
return INVALID_PORT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the container server IPC port.
|
||||
*
|
||||
* @return Container server IPC port.
|
||||
*/
|
||||
public int getContainerServerPort() {
|
||||
return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Ratis container Server IPC port.
|
||||
*
|
||||
* @return Ratis port.
|
||||
*/
|
||||
public int getRatisContainerServerPort() {
|
||||
return getPortbyType(HddsProtos.ReplicationType.RATIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns container report.
|
||||
* @return - container report.
|
||||
* @throws IOException
|
||||
*/
|
||||
public ContainerReportsProto getContainerReport() throws IOException {
|
||||
return this.manager.getContainerReport();
|
||||
public StorageContainerDatanodeProtocolProtos.ContainerReportsProto
|
||||
getContainerReport() throws IOException {
|
||||
return this.containerSet.getContainerReport();
|
||||
}
|
||||
|
||||
// TODO: remove getContainerReports
|
||||
/**
|
||||
* Returns the list of closed containers.
|
||||
* @return - List of closed containers.
|
||||
* Submit ContainerRequest.
|
||||
* @param request
|
||||
* @param replicationType
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<ContainerData> getClosedContainerReports() throws IOException {
|
||||
return this.manager.getClosedContainerReports();
|
||||
}
|
||||
|
||||
private XceiverServerSpi getRatisSerer() {
|
||||
for (XceiverServerSpi serverInstance : server) {
|
||||
if (serverInstance instanceof XceiverServerRatis) {
|
||||
return serverInstance;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private XceiverServerSpi getStandaAloneSerer() {
|
||||
for (XceiverServerSpi serverInstance : server) {
|
||||
if (!(serverInstance instanceof XceiverServerRatis)) {
|
||||
return serverInstance;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ContainerManager getContainerManager() {
|
||||
return this.manager;
|
||||
}
|
||||
|
||||
public void submitContainerRequest(
|
||||
ContainerProtos.ContainerCommandRequestProto request,
|
||||
HddsProtos.ReplicationType replicationType) throws IOException {
|
||||
@ -332,4 +206,66 @@ private long getContainerIdForCmd(
|
||||
+ " not supported over HearBeat Response");
|
||||
}
|
||||
}
|
||||
|
||||
private XceiverServerSpi getRatisSerer() {
|
||||
for (XceiverServerSpi serverInstance : server) {
|
||||
if (serverInstance instanceof XceiverServerRatis) {
|
||||
return serverInstance;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private XceiverServerSpi getStandaAloneSerer() {
|
||||
for (XceiverServerSpi serverInstance : server) {
|
||||
if (!(serverInstance instanceof XceiverServerRatis)) {
|
||||
return serverInstance;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private int getPortbyType(HddsProtos.ReplicationType replicationType) {
|
||||
for (XceiverServerSpi serverinstance : server) {
|
||||
if (serverinstance.getServerType() == replicationType) {
|
||||
return serverinstance.getIPCPort();
|
||||
}
|
||||
}
|
||||
return INVALID_PORT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the container server IPC port.
|
||||
*
|
||||
* @return Container server IPC port.
|
||||
*/
|
||||
public int getContainerServerPort() {
|
||||
return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Ratis container Server IPC port.
|
||||
*
|
||||
* @return Ratis port.
|
||||
*/
|
||||
public int getRatisContainerServerPort() {
|
||||
return getPortbyType(HddsProtos.ReplicationType.RATIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns node report of container storage usage.
|
||||
*/
|
||||
public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport()
|
||||
throws IOException {
|
||||
return volumeSet.getNodeReport();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ContainerDispatcher getDispatcher() {
|
||||
return this.hddsDispatcher;
|
||||
}
|
||||
|
||||
public VolumeSet getVolumeSet() {
|
||||
return volumeSet;
|
||||
}
|
||||
}
|
@ -105,6 +105,10 @@ public SCMVersionResponseProto getProtobufMessage() {
|
||||
.addAllKeys(list).build();
|
||||
}
|
||||
|
||||
public String getValue(String key) {
|
||||
return this.values.get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class.
|
||||
*/
|
||||
|
@ -25,16 +25,20 @@
|
||||
.StorageContainerDatanodeProtocolService;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.StorageContainerDatanodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
|
||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
|
||||
|
||||
/**
|
||||
* Test Endpoint class.
|
||||
*/
|
||||
@ -109,8 +113,13 @@ public static InetSocketAddress getReuseableAddress() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
public static Configuration getConf() {
|
||||
return new Configuration();
|
||||
public static OzoneConfiguration getConf() {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.set(HDDS_DATANODE_DIR_KEY, GenericTestUtils
|
||||
.getRandomizedTempPath());
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, GenericTestUtils
|
||||
.getRandomizedTempPath());
|
||||
return conf;
|
||||
}
|
||||
|
||||
public static OzoneConfiguration getOzoneConf() {
|
||||
|
@ -38,6 +38,7 @@
|
||||
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
||||
|
||||
@ -151,7 +152,10 @@ public long getBytesUsed() {
|
||||
return VersionResponse.newBuilder()
|
||||
.setVersion(versionInfo.getVersion())
|
||||
.addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription())
|
||||
.addValue(OzoneConsts.SCM_ID, UUID.randomUUID().toString())
|
||||
.addValue(OzoneConsts.CLUSTER_ID, UUID.randomUUID().toString())
|
||||
.build().getProtobufMessage();
|
||||
|
||||
}
|
||||
|
||||
private void sleepIfNeeded() {
|
||||
|
@ -42,8 +42,7 @@ public void testKeyValueData() {
|
||||
.ContainerLifeCycleState.CLOSED;
|
||||
AtomicLong val = new AtomicLong(0);
|
||||
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(containerType,
|
||||
containerId);
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(containerId);
|
||||
|
||||
assertEquals(containerType, kvData.getContainerType());
|
||||
assertEquals(containerId, kvData.getContainerId());
|
||||
|
@ -22,7 +22,6 @@
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueYaml;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -36,7 +35,7 @@
|
||||
/**
|
||||
* This class tests create/read .container files.
|
||||
*/
|
||||
public class TestKeyValueYaml {
|
||||
public class TestContainerDataYaml {
|
||||
|
||||
@Test
|
||||
public void testCreateContainerFile() throws IOException {
|
||||
@ -46,8 +45,7 @@ public void testCreateContainerFile() throws IOException {
|
||||
File filePath = new File(new FileSystemTestHelper().getTestRootDir());
|
||||
filePath.mkdirs();
|
||||
|
||||
KeyValueContainerData keyValueContainerData = new KeyValueContainerData(
|
||||
ContainerProtos.ContainerType.KeyValueContainer, Long.MAX_VALUE);
|
||||
KeyValueContainerData keyValueContainerData = new KeyValueContainerData(Long.MAX_VALUE);
|
||||
keyValueContainerData.setContainerDBType("RocksDB");
|
||||
keyValueContainerData.setMetadataPath(path);
|
||||
keyValueContainerData.setChunksPath(path);
|
||||
@ -55,14 +53,15 @@ public void testCreateContainerFile() throws IOException {
|
||||
File containerFile = new File(filePath, containerPath);
|
||||
|
||||
// Create .container file with ContainerData
|
||||
KeyValueYaml.createContainerFile(containerFile, keyValueContainerData);
|
||||
ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
|
||||
.KeyValueContainer, containerFile, keyValueContainerData);
|
||||
|
||||
//Check .container file exists or not.
|
||||
assertTrue(containerFile.exists());
|
||||
|
||||
// Read from .container file, and verify data.
|
||||
KeyValueContainerData kvData = KeyValueYaml.readContainerFile(
|
||||
containerFile);
|
||||
KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
|
||||
.readContainerFile(containerFile);
|
||||
assertEquals(Long.MAX_VALUE, kvData.getContainerId());
|
||||
assertEquals(ContainerProtos.ContainerType.KeyValueContainer, kvData
|
||||
.getContainerType());
|
||||
@ -82,10 +81,12 @@ public void testCreateContainerFile() throws IOException {
|
||||
|
||||
// Update .container file with new ContainerData.
|
||||
containerFile = new File(filePath, containerPath);
|
||||
KeyValueYaml.createContainerFile(containerFile, kvData);
|
||||
ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
|
||||
.KeyValueContainer, containerFile, kvData);
|
||||
|
||||
// Reading newly updated data from .container file
|
||||
kvData = KeyValueYaml.readContainerFile(containerFile);
|
||||
kvData = (KeyValueContainerData) ContainerDataYaml.readContainerFile(
|
||||
containerFile);
|
||||
|
||||
// verify data.
|
||||
assertEquals(Long.MAX_VALUE, kvData.getContainerId());
|
||||
@ -113,7 +114,8 @@ public void testIncorrectContainerFile() throws IOException{
|
||||
//Get file from resources folder
|
||||
ClassLoader classLoader = getClass().getClassLoader();
|
||||
File file = new File(classLoader.getResource(path).getFile());
|
||||
KeyValueContainerData kvData = KeyValueYaml.readContainerFile(file);
|
||||
KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
|
||||
.readContainerFile(file);
|
||||
fail("testIncorrectContainerFile failed");
|
||||
} catch (IllegalStateException ex) {
|
||||
GenericTestUtils.assertExceptionContains("Unexpected " +
|
||||
@ -135,7 +137,8 @@ public void testCheckBackWardCompatabilityOfContainerFile() throws
|
||||
//Get file from resources folder
|
||||
ClassLoader classLoader = getClass().getClassLoader();
|
||||
File file = new File(classLoader.getResource(path).getFile());
|
||||
KeyValueContainerData kvData = KeyValueYaml.readContainerFile(file);
|
||||
KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
|
||||
.readContainerFile(file);
|
||||
|
||||
//Checking the Container file data is consistent or not
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED, kvData
|
@ -53,8 +53,7 @@ public void testAddGetRemoveContainer() throws StorageContainerException {
|
||||
ContainerProtos.ContainerLifeCycleState state = ContainerProtos
|
||||
.ContainerLifeCycleState.CLOSED;
|
||||
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(
|
||||
ContainerProtos.ContainerType.KeyValueContainer, containerId);
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(containerId);
|
||||
kvData.setState(state);
|
||||
KeyValueContainer keyValueContainer = new KeyValueContainer(kvData, new
|
||||
OzoneConfiguration());
|
||||
@ -164,8 +163,7 @@ public void testListContainer() throws StorageContainerException {
|
||||
private ContainerSet createContainerSet() throws StorageContainerException {
|
||||
ContainerSet containerSet = new ContainerSet();
|
||||
for (int i=0; i<10; i++) {
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(
|
||||
ContainerProtos.ContainerType.KeyValueContainer, i);
|
||||
KeyValueContainerData kvData = new KeyValueContainerData(i);
|
||||
if (i%2 == 0) {
|
||||
kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
|
||||
} else {
|
||||
|
@ -52,7 +52,6 @@ public class TestHandler {
|
||||
private VolumeSet volumeSet;
|
||||
private Handler handler;
|
||||
|
||||
private final static String SCM_ID = UUID.randomUUID().toString();
|
||||
private final static String DATANODE_UUID = UUID.randomUUID().toString();
|
||||
|
||||
@Before
|
||||
@ -61,12 +60,12 @@ public void setup() throws Exception {
|
||||
this.containerSet = Mockito.mock(ContainerSet.class);
|
||||
this.volumeSet = Mockito.mock(VolumeSet.class);
|
||||
|
||||
this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, SCM_ID);
|
||||
this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyValueHandler() throws Exception {
|
||||
Handler kvHandler = dispatcher.getHandlerForContainerType(
|
||||
Handler kvHandler = dispatcher.getHandler(
|
||||
ContainerProtos.ContainerType.KeyValueContainer);
|
||||
|
||||
Assert.assertTrue("getHandlerForContainerType returned incorrect handler",
|
||||
@ -83,8 +82,7 @@ public void testGetHandlerForInvalidContainerType() {
|
||||
Assert.assertEquals("New ContainerType detected. Not an invalid " +
|
||||
"containerType", invalidContainerType, null);
|
||||
|
||||
Handler handler = dispatcher.getHandlerForContainerType(
|
||||
invalidContainerType);
|
||||
Handler handler = dispatcher.getHandler(invalidContainerType);
|
||||
Assert.assertEquals("Get Handler for Invalid ContainerType should " +
|
||||
"return null.", handler, null);
|
||||
}
|
||||
|
@ -81,8 +81,7 @@ public void setUp() throws Exception {
|
||||
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
|
||||
.thenReturn(hddsVolume);
|
||||
|
||||
keyValueContainerData = new KeyValueContainerData(
|
||||
ContainerProtos.ContainerType.KeyValueContainer, 1L);
|
||||
keyValueContainerData = new KeyValueContainerData(1L);
|
||||
|
||||
keyValueContainer = new KeyValueContainer(
|
||||
keyValueContainerData, config);
|
||||
|
@ -79,8 +79,7 @@ public void setUp() throws Exception {
|
||||
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
|
||||
.thenReturn(hddsVolume);
|
||||
|
||||
keyValueContainerData = new KeyValueContainerData(
|
||||
ContainerProtos.ContainerType.KeyValueContainer, 1L);
|
||||
keyValueContainerData = new KeyValueContainerData(1L);
|
||||
|
||||
keyValueContainer = new KeyValueContainer(
|
||||
keyValueContainerData, config);
|
||||
|
@ -24,6 +24,7 @@
|
||||
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||
.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume
|
||||
.RoundRobinVolumeChoosingPolicy;
|
||||
@ -85,8 +86,7 @@ public void setUp() throws Exception {
|
||||
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
|
||||
.thenReturn(hddsVolume);
|
||||
|
||||
keyValueContainerData = new KeyValueContainerData(
|
||||
ContainerProtos.ContainerType.KeyValueContainer, 1L);
|
||||
keyValueContainerData = new KeyValueContainerData(1L);
|
||||
|
||||
keyValueContainer = new KeyValueContainer(
|
||||
keyValueContainerData, conf);
|
||||
@ -197,7 +197,8 @@ public void testCloseContainer() throws Exception {
|
||||
File containerFile = KeyValueContainerLocationUtil.getContainerFile(
|
||||
containerMetaDataLoc, containerName);
|
||||
|
||||
keyValueContainerData = KeyValueYaml.readContainerFile(containerFile);
|
||||
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
|
||||
.readContainerFile(containerFile);
|
||||
assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
|
||||
keyValueContainerData.getState());
|
||||
}
|
||||
@ -237,7 +238,8 @@ public void testUpdateContainer() throws IOException {
|
||||
File containerFile = KeyValueContainerLocationUtil.getContainerFile(
|
||||
containerMetaDataLoc, containerName);
|
||||
|
||||
keyValueContainerData = KeyValueYaml.readContainerFile(containerFile);
|
||||
keyValueContainerData = (KeyValueContainerData) ContainerDataYaml
|
||||
.readContainerFile(containerFile);
|
||||
assertEquals(2, keyValueContainerData.getMetadata().size());
|
||||
|
||||
}
|
||||
|
@ -74,9 +74,10 @@ private void setup() throws Exception {
|
||||
.build();
|
||||
this.volumeSet = new VolumeSet(datanodeDetails, conf);
|
||||
|
||||
this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, SCM_ID);
|
||||
this.handler = (KeyValueHandler) dispatcher.getHandlerForContainerType(
|
||||
this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet);
|
||||
this.handler = (KeyValueHandler) dispatcher.getHandler(
|
||||
ContainerProtos.ContainerType.KeyValueContainer);
|
||||
dispatcher.setScmId(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -87,8 +88,7 @@ public void testHandlerCommandHandling() throws Exception{
|
||||
// Create mock HddsDispatcher and KeyValueHandler.
|
||||
this.handler = Mockito.mock(KeyValueHandler.class);
|
||||
this.dispatcher = Mockito.mock(HddsDispatcher.class);
|
||||
Mockito.when(dispatcher.getHandlerForContainerType(any())).thenReturn
|
||||
(handler);
|
||||
Mockito.when(dispatcher.getHandler(any())).thenReturn(handler);
|
||||
Mockito.when(dispatcher.dispatch(any())).thenCallRealMethod();
|
||||
Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
|
||||
Mockito.mock(KeyValueContainer.class));
|
||||
|
@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.ozoneimpl;
|
||||
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* This class is used to test OzoneContainer.
|
||||
*/
|
||||
public class TestOzoneContainer {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
|
||||
|
||||
private OzoneConfiguration conf;
|
||||
private String scmId = UUID.randomUUID().toString();
|
||||
private VolumeSet volumeSet;
|
||||
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
|
||||
private KeyValueContainerData keyValueContainerData;
|
||||
private KeyValueContainer keyValueContainer;
|
||||
private final DatanodeDetails datanodeDetails = createDatanodeDetails();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new OzoneConfiguration();
|
||||
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, folder.getRoot()
|
||||
.getAbsolutePath() + "," + folder.newFolder().getAbsolutePath());
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().getAbsolutePath());
|
||||
volumeSet = new VolumeSet(datanodeDetails, conf);
|
||||
volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
|
||||
|
||||
for (int i=0; i<10; i++) {
|
||||
keyValueContainerData = new KeyValueContainerData(i);
|
||||
keyValueContainer = new KeyValueContainer(
|
||||
keyValueContainerData, conf);
|
||||
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildContainerMap() throws Exception {
|
||||
OzoneContainer ozoneContainer = new
|
||||
OzoneContainer(datanodeDetails, conf);
|
||||
ContainerSet containerset = ozoneContainer.getContainerSet();
|
||||
assertEquals(10, containerset.containerCount());
|
||||
}
|
||||
|
||||
|
||||
private DatanodeDetails createDatanodeDetails() {
|
||||
Random random = new Random();
|
||||
String ipAddress =
|
||||
random.nextInt(256) + "." + random.nextInt(256) + "." + random
|
||||
.nextInt(256) + "." + random.nextInt(256);
|
||||
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
String hostName = uuid;
|
||||
DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
|
||||
DatanodeDetails.Port.Name.STANDALONE, 0);
|
||||
DatanodeDetails.Port ratisPort = DatanodeDetails.newPort(
|
||||
DatanodeDetails.Port.Name.RATIS, 0);
|
||||
DatanodeDetails.Port restPort = DatanodeDetails.newPort(
|
||||
DatanodeDetails.Port.Name.REST, 0);
|
||||
DatanodeDetails.Builder builder = DatanodeDetails.newBuilder();
|
||||
builder.setUuid(uuid)
|
||||
.setHostName("localhost")
|
||||
.setIpAddress(ipAddress)
|
||||
.addPort(containerPort)
|
||||
.addPort(ratisPort)
|
||||
.addPort(restPort);
|
||||
return builder.build();
|
||||
}
|
||||
}
|
@ -40,6 +40,7 @@
|
||||
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
||||
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
||||
@ -703,6 +704,9 @@ long getLastHBProcessedCount() {
|
||||
public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
|
||||
return VersionResponse.newBuilder()
|
||||
.setVersion(this.version.getVersion())
|
||||
.addValue(OzoneConsts.SCM_ID, this.scmManager.getScmStorage().getScmId())
|
||||
.addValue(OzoneConsts.CLUSTER_ID, this.scmManager.getScmStorage()
|
||||
.getClusterID())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.VersionInfo;
|
||||
@ -125,12 +126,14 @@ public void testGetVersion() throws Exception {
|
||||
* how the state machine would make the call.
|
||||
*/
|
||||
public void testGetVersionTask() throws Exception {
|
||||
Configuration conf = SCMTestUtils.getConf();
|
||||
OzoneConfiguration conf = SCMTestUtils.getConf();
|
||||
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
||||
serverAddress, 1000)) {
|
||||
OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(),
|
||||
conf);
|
||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
||||
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
||||
conf);
|
||||
conf, ozoneContainer);
|
||||
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
||||
|
||||
// if version call worked the endpoint should automatically move to the
|
||||
@ -149,14 +152,16 @@ public void testGetVersionTask() throws Exception {
|
||||
* expect that versionTask should be able to handle it.
|
||||
*/
|
||||
public void testGetVersionToInvalidEndpoint() throws Exception {
|
||||
Configuration conf = SCMTestUtils.getConf();
|
||||
OzoneConfiguration conf = SCMTestUtils.getConf();
|
||||
InetSocketAddress nonExistentServerAddress = SCMTestUtils
|
||||
.getReuseableAddress();
|
||||
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
||||
nonExistentServerAddress, 1000)) {
|
||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
||||
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
||||
OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(),
|
||||
conf);
|
||||
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
||||
conf, ozoneContainer);
|
||||
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
||||
|
||||
// This version call did NOT work, so endpoint should remain in the same
|
||||
@ -175,13 +180,15 @@ public void testGetVersionToInvalidEndpoint() throws Exception {
|
||||
public void testGetVersionAssertRpcTimeOut() throws Exception {
|
||||
final long rpcTimeout = 1000;
|
||||
final long tolerance = 100;
|
||||
Configuration conf = SCMTestUtils.getConf();
|
||||
OzoneConfiguration conf = SCMTestUtils.getConf();
|
||||
|
||||
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
||||
serverAddress, (int) rpcTimeout)) {
|
||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
||||
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
||||
OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(),
|
||||
conf);
|
||||
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
||||
conf, ozoneContainer);
|
||||
|
||||
scmServerImpl.setRpcResponseDelay(1500);
|
||||
long start = Time.monotonicNow();
|
||||
@ -386,4 +393,5 @@ private ContainerReportsProto createContainerReport(
|
||||
}
|
||||
return reportsBuilder.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -27,8 +27,10 @@
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||
@ -163,8 +165,9 @@ private MetadataStore getContainerMetadata(Long containerID)
|
||||
DatanodeDetails leadDN = container.getPipeline().getLeader();
|
||||
OzoneContainer containerServer =
|
||||
getContainerServerByDatanodeUuid(leadDN.getUuidString());
|
||||
ContainerData containerData = containerServer.getContainerManager()
|
||||
.readContainer(containerID);
|
||||
KeyValueContainerData containerData = (KeyValueContainerData) containerServer
|
||||
.getContainerSet()
|
||||
.getContainer(containerID).getContainerData();
|
||||
return KeyUtils.getDB(containerData, conf);
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@
|
||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||
@ -183,7 +183,7 @@ public void testCloseContainerViaRatis() throws IOException,
|
||||
for (DatanodeDetails datanodeDetails : datanodes) {
|
||||
GenericTestUtils.waitFor(
|
||||
() -> isContainerClosed(cluster, containerID, datanodeDetails), 500,
|
||||
5 * 1000);
|
||||
15 * 1000);
|
||||
//double check if it's really closed (waitFor also throws an exception)
|
||||
Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
|
||||
}
|
||||
@ -204,7 +204,7 @@ private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID,
|
||||
if (datanode.equals(datanodeService.getDatanodeDetails())) {
|
||||
containerData =
|
||||
datanodeService.getDatanodeStateMachine().getContainer()
|
||||
.getContainerManager().readContainer(containerID);
|
||||
.getContainerSet().getContainer(containerID).getContainerData();
|
||||
if (!containerData.isOpen()) {
|
||||
// make sure the closeContainerHandler on the Datanode is invoked
|
||||
Assert.assertTrue(
|
||||
|
@ -27,7 +27,7 @@
|
||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||
import org.apache.hadoop.ozone.client.rest.OzoneException;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
|
||||
@ -104,8 +104,8 @@ private Boolean isContainerClosed(MiniOzoneCluster cluster,
|
||||
ContainerData containerData;
|
||||
try {
|
||||
containerData = cluster.getHddsDatanodes().get(0)
|
||||
.getDatanodeStateMachine().getContainer().getContainerManager()
|
||||
.readContainer(containerID);
|
||||
.getDatanodeStateMachine().getContainer().getContainerSet()
|
||||
.getContainer(containerID).getContainerData();
|
||||
return !containerData.isOpen();
|
||||
} catch (StorageContainerException e) {
|
||||
throw new AssertionError(e);
|
||||
|
@ -35,10 +35,7 @@
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
@ -66,7 +63,11 @@ public void testCreateOzoneContainer() throws Exception {
|
||||
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getLeader()
|
||||
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
|
||||
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
|
||||
container = new OzoneContainer(TestUtils.getDatanodeDetails(), conf);
|
||||
|
||||
container = new OzoneContainer(TestUtils.getDatanodeDetails(),
|
||||
conf);
|
||||
//Setting scmId, as we start manually ozone container.
|
||||
container.getDispatcher().setScmId(UUID.randomUUID().toString());
|
||||
container.start();
|
||||
|
||||
XceiverClient client = new XceiverClient(pipeline, conf);
|
||||
@ -392,7 +393,7 @@ public void testDeleteContainer() throws Exception {
|
||||
response = client.sendCommand(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.UNCLOSED_CONTAINER_IO,
|
||||
Assert.assertEquals(ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER,
|
||||
response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.ozone.container.server;
|
||||
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
|
||||
import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
@ -262,5 +263,14 @@ public void init() {
|
||||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
@Override
|
||||
public Handler getHandler(ContainerProtos.ContainerType containerType) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScmId(String scmId) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
@ -27,8 +27,8 @@
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.client.*;
|
||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
@ -119,8 +119,8 @@ public void testContainerReportKeyWrite() throws Exception {
|
||||
|
||||
ContainerData cd = getContainerData(keyInfo.getContainerID());
|
||||
|
||||
LOG.info("DN Container Data: keyCount: {} used: {} ",
|
||||
cd.getKeyCount(), cd.getBytesUsed());
|
||||
/* LOG.info("DN Container Data: keyCount: {} used: {} ",
|
||||
cd.getKeyCount(), cd.getBytesUsed());*/
|
||||
|
||||
ContainerInfo cinfo = scm.getContainerInfo(keyInfo.getContainerID());
|
||||
|
||||
@ -132,9 +132,9 @@ public void testContainerReportKeyWrite() throws Exception {
|
||||
private static ContainerData getContainerData(long containerID) {
|
||||
ContainerData containerData;
|
||||
try {
|
||||
ContainerManager containerManager = cluster.getHddsDatanodes().get(0)
|
||||
.getDatanodeStateMachine().getContainer().getContainerManager();
|
||||
containerData = containerManager.readContainer(containerID);
|
||||
ContainerSet containerManager = cluster.getHddsDatanodes().get(0)
|
||||
.getDatanodeStateMachine().getContainer().getContainerSet();
|
||||
containerData = containerManager.getContainer(containerID).getContainerData();
|
||||
} catch (StorageContainerException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
|
@ -44,9 +44,10 @@
|
||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.ozone.client.rpc.RpcClient;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
@ -698,13 +699,16 @@ public void testDeleteKey() throws Exception {
|
||||
List<KsmKeyLocationInfo> locations =
|
||||
keyInfo.getLatestVersionLocations().getLocationList();
|
||||
for (KsmKeyLocationInfo location : locations) {
|
||||
KeyData keyData = new KeyData(location.getBlockID());
|
||||
KeyData blockInfo = cm.getContainerManager()
|
||||
.getKeyManager().getKey(keyData);
|
||||
ContainerData containerData = cm.getContainerManager()
|
||||
.readContainer(keyData.getContainerID());
|
||||
File dataDir = ContainerUtils
|
||||
.getDataDirectory(containerData).toFile();
|
||||
KeyValueHandler keyValueHandler = (KeyValueHandler) cm
|
||||
.getDispatcher().getHandler(ContainerProtos.ContainerType
|
||||
.KeyValueContainer);
|
||||
KeyValueContainer container = (KeyValueContainer) cm.getContainerSet()
|
||||
.getContainer(location.getBlockID().getContainerID());
|
||||
KeyData blockInfo = keyValueHandler
|
||||
.getKeyManager().getKey(container, location.getBlockID());
|
||||
KeyValueContainerData containerData = (KeyValueContainerData) container
|
||||
.getContainerData();
|
||||
File dataDir = new File(containerData.getChunksPath());
|
||||
for (ContainerProtos.ChunkInfo chunkInfo : blockInfo.getChunks()) {
|
||||
File chunkFile = dataDir.toPath()
|
||||
.resolve(chunkInfo.getChunkName()).toFile();
|
||||
|
Loading…
Reference in New Issue
Block a user