HDDS-837. Persist originNodeId as part of .container file in datanode.

Contributed by Nanda kumar.
This commit is contained in:
Nanda kumar 2018-11-19 14:38:45 +05:30
parent 10cf5773ba
commit 5a7ca6ac3e
36 changed files with 263 additions and 135 deletions

View File

@ -188,6 +188,8 @@ private OzoneConsts() {
public static final String CHUNKS_PATH = "chunksPath";
public static final String CONTAINER_DB_TYPE = "containerDBType";
public static final String CHECKSUM = "checksum";
public static final String ORIGIN_PIPELINE_ID = "originPipelineId";
public static final String ORIGIN_NODE_ID = "originNodeId";
// For OM Audit usage
public static final String VOLUME = "volume";

View File

@ -168,31 +168,32 @@ message ContainerCommandRequestProto {
required int64 containerID = 3;
required string datanodeUuid = 4;
optional string pipelineID = 5;
// One of the following command is available when the corresponding
// cmdType is set. At the protocol level we allow only
// one command in each packet.
// TODO : Upgrade to Protobuf 2.6 or later.
optional CreateContainerRequestProto createContainer = 5;
optional ReadContainerRequestProto readContainer = 6;
optional UpdateContainerRequestProto updateContainer = 7;
optional DeleteContainerRequestProto deleteContainer = 8;
optional ListContainerRequestProto listContainer = 9;
optional CloseContainerRequestProto closeContainer = 10;
optional CreateContainerRequestProto createContainer = 6;
optional ReadContainerRequestProto readContainer = 7;
optional UpdateContainerRequestProto updateContainer = 8;
optional DeleteContainerRequestProto deleteContainer = 9;
optional ListContainerRequestProto listContainer = 10;
optional CloseContainerRequestProto closeContainer = 11;
optional PutBlockRequestProto putBlock = 11;
optional GetBlockRequestProto getBlock = 12;
optional DeleteBlockRequestProto deleteBlock = 13;
optional ListBlockRequestProto listBlock = 14;
optional PutBlockRequestProto putBlock = 12;
optional GetBlockRequestProto getBlock = 13;
optional DeleteBlockRequestProto deleteBlock = 14;
optional ListBlockRequestProto listBlock = 15;
optional ReadChunkRequestProto readChunk = 15;
optional WriteChunkRequestProto writeChunk = 16;
optional DeleteChunkRequestProto deleteChunk = 17;
optional ListChunkRequestProto listChunk = 18;
optional ReadChunkRequestProto readChunk = 16;
optional WriteChunkRequestProto writeChunk = 17;
optional DeleteChunkRequestProto deleteChunk = 18;
optional ListChunkRequestProto listChunk = 19;
optional PutSmallFileRequestProto putSmallFile = 19;
optional GetSmallFileRequestProto getSmallFile = 20;
optional GetCommittedBlockLengthRequestProto getCommittedBlockLength = 21;
optional PutSmallFileRequestProto putSmallFile = 20;
optional GetSmallFileRequestProto getSmallFile = 21;
optional GetCommittedBlockLengthRequestProto getCommittedBlockLength = 22;
}
message ContainerCommandResponseProto {

View File

@ -42,6 +42,8 @@
import static org.apache.hadoop.ozone.OzoneConsts.LAYOUTVERSION;
import static org.apache.hadoop.ozone.OzoneConsts.MAX_SIZE;
import static org.apache.hadoop.ozone.OzoneConsts.METADATA;
import static org.apache.hadoop.ozone.OzoneConsts.ORIGIN_NODE_ID;
import static org.apache.hadoop.ozone.OzoneConsts.ORIGIN_PIPELINE_ID;
import static org.apache.hadoop.ozone.OzoneConsts.STATE;
/**
@ -69,6 +71,11 @@ public abstract class ContainerData {
private final long maxSize;
//ID of the pipeline where this container is created
private String originPipelineId;
//ID of the datanode where this container is created
private String originNodeId;
/** parameters for read/write statistics on the container. **/
private final AtomicLong readBytes;
private final AtomicLong writeBytes;
@ -93,17 +100,22 @@ public abstract class ContainerData {
STATE,
METADATA,
MAX_SIZE,
CHECKSUM));
CHECKSUM,
ORIGIN_PIPELINE_ID,
ORIGIN_NODE_ID));
/**
* Creates a ContainerData Object, which holds metadata of the container.
* @param type - ContainerType
* @param containerId - ContainerId
* @param size - container maximum size in bytes
* @param originPipelineId - Pipeline Id where this container is/was created
* @param originNodeId - Node Id where this container is/was created
*/
protected ContainerData(ContainerType type, long containerId, long size) {
this(type, containerId,
ChunkLayOutVersion.getLatestVersion().getVersion(), size);
protected ContainerData(ContainerType type, long containerId, long size,
String originPipelineId, String originNodeId) {
this(type, containerId, ChunkLayOutVersion.getLatestVersion().getVersion(),
size, originPipelineId, originNodeId);
}
/**
@ -112,9 +124,12 @@ protected ContainerData(ContainerType type, long containerId, long size) {
* @param containerId - ContainerId
* @param layOutVersion - Container layOutVersion
* @param size - Container maximum size in bytes
* @param originPipelineId - Pipeline Id where this container is/was created
* @param originNodeId - Node Id where this container is/was created
*/
protected ContainerData(ContainerType type, long containerId,
int layOutVersion, long size) {
int layOutVersion, long size, String originPipelineId,
String originNodeId) {
Preconditions.checkNotNull(type);
this.containerType = type;
@ -129,6 +144,8 @@ protected ContainerData(ContainerType type, long containerId,
this.bytesUsed = new AtomicLong(0L);
this.keyCount = new AtomicLong(0L);
this.maxSize = size;
this.originPipelineId = originPipelineId;
this.originNodeId = originNodeId;
setChecksumTo0ByteArray();
}
@ -418,6 +435,23 @@ public String getChecksum() {
return this.checksum;
}
/**
* Returns the origin pipeline Id of this container.
* @return origin node Id
*/
public String getOriginPipelineId() {
return originPipelineId;
}
/**
* Returns the origin node Id of this container.
* @return origin node Id
*/
public String getOriginNodeId() {
return originNodeId;
}
/**
* Compute the checksum for ContainerData using the specified Yaml (based
* on ContainerType) and set the checksum.

View File

@ -238,9 +238,14 @@ public Object construct(Node node) {
long size = (long) nodes.get(OzoneConsts.MAX_SIZE);
String originPipelineId = (String) nodes.get(
OzoneConsts.ORIGIN_PIPELINE_ID);
String originNodeId = (String) nodes.get(OzoneConsts.ORIGIN_NODE_ID);
//When a new field is added, it needs to be added here.
KeyValueContainerData kvData = new KeyValueContainerData(
(long) nodes.get(OzoneConsts.CONTAINER_ID), lv, size);
(long) nodes.get(OzoneConsts.CONTAINER_ID), lv, size,
originPipelineId, originNodeId);
kvData.setContainerDBType((String)nodes.get(
OzoneConsts.CONTAINER_DB_TYPE));

View File

@ -262,6 +262,7 @@ private void createContainer(ContainerCommandRequestProto containerRequest) {
.setCmdType(ContainerProtos.Type.CreateContainer)
.setContainerID(containerRequest.getContainerID())
.setCreateContainer(createRequest.build())
.setPipelineID(containerRequest.getPipelineID())
.setDatanodeUuid(containerRequest.getDatanodeUuid())
.setTraceID(containerRequest.getTraceID());

View File

@ -23,6 +23,7 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -52,6 +53,7 @@ public abstract class Handler {
protected final ContainerMetrics metrics;
private final StateContext context;
private final DatanodeDetails datanodeDetails;
protected Handler(Configuration config, StateContext context,
ContainerSet contSet, VolumeSet volumeSet,
@ -61,6 +63,7 @@ protected Handler(Configuration config, StateContext context,
this.containerSet = contSet;
this.volumeSet = volumeSet;
this.metrics = containerMetrics;
this.datanodeDetails = context.getParent().getDatanodeDetails();
}
public static Handler getHandlerForContainerType(
@ -76,6 +79,13 @@ public static Handler getHandlerForContainerType(
}
}
/**
* Returns the Id of this datanode.
* @return datanode Id
*/
protected DatanodeDetails getDatanodeDetails() {
return datanodeDetails;
}
/**
* This should be called whenever there is state change. It will trigger
* an ICR to SCM.
@ -101,6 +111,8 @@ public abstract ContainerCommandResponseProto handle(
public abstract Container importContainer(
long containerID,
long maxSize,
String originPipelineId,
String originNodeId,
FileInputStream rawContainerStream,
TarContainerPacker packer)
throws IOException;

View File

@ -281,7 +281,12 @@ private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) {
private ContainerCommandRequestProto getRequestProto(ByteString request)
throws InvalidProtocolBufferException {
return ContainerCommandRequestProto.parseFrom(request);
// TODO: We can avoid creating new builder and set pipeline Id if
// the client is already sending the pipeline id, then we just have to
// validate the pipeline Id.
return ContainerCommandRequestProto.newBuilder(
ContainerCommandRequestProto.parseFrom(request))
.setPipelineID(gid.getUuid().toString()).build();
}
private ContainerCommandResponseProto dispatchCommand(

View File

@ -550,7 +550,8 @@ public ContainerReplicaProto getContainerReport()
.setUsed(containerData.getBytesUsed())
.setState(getHddsState())
.setDeleteTransactionId(containerData.getDeleteTransactionId())
.setBlockCommitSequenceId(containerData.getBlockCommitSequenceId());
.setBlockCommitSequenceId(containerData.getBlockCommitSequenceId())
.setOriginNodeId(containerData.getOriginNodeId());
return ciBuilder.build();
}

View File

@ -18,22 +18,17 @@
package org.apache.hadoop.ozone.container.keyvalue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.Collections;
import org.apache.hadoop.conf.StorageSize;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.yaml.snakeyaml.nodes.Tag;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -90,8 +85,10 @@ public class KeyValueContainerData extends ContainerData {
* @param id - ContainerId
* @param size - maximum size of the container in bytes
*/
public KeyValueContainerData(long id, long size) {
super(ContainerProtos.ContainerType.KeyValueContainer, id, size);
public KeyValueContainerData(long id, long size,
String originPipelineId, String originNodeId) {
super(ContainerProtos.ContainerType.KeyValueContainer, id, size,
originPipelineId, originNodeId);
this.numPendingDeletionBlocks = new AtomicInteger(0);
this.deleteTransactionId = 0;
}
@ -102,9 +99,10 @@ public KeyValueContainerData(long id, long size) {
* @param layOutVersion
* @param size - maximum size of the container in bytes
*/
public KeyValueContainerData(long id, int layOutVersion, long size) {
public KeyValueContainerData(long id, int layOutVersion, long size,
String originPipelineId, String originNodeId) {
super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion,
size);
size, originPipelineId, originNodeId);
this.numPendingDeletionBlocks = new AtomicInteger(0);
this.deleteTransactionId = 0;
}
@ -275,40 +273,4 @@ public static List<String> getYamlFields() {
return Collections.unmodifiableList(KV_YAML_FIELDS);
}
/**
* Constructs a KeyValueContainerData object from ProtoBuf classes.
*
* @param protoData - ProtoBuf Message
* @throws IOException
*/
@VisibleForTesting
public static KeyValueContainerData getFromProtoBuf(
ContainerDataProto protoData) throws IOException {
// TODO: Add containerMaxSize to ContainerProtos.ContainerData
StorageSize storageSize = StorageSize.parse(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
KeyValueContainerData data = new KeyValueContainerData(
protoData.getContainerID(),
(long)storageSize.getUnit().toBytes(storageSize.getValue()));
for (int x = 0; x < protoData.getMetadataCount(); x++) {
data.addMetadata(protoData.getMetadata(x).getKey(),
protoData.getMetadata(x).getValue());
}
if (protoData.hasContainerPath()) {
String metadataPath = protoData.getContainerPath()+ File.separator +
OzoneConsts.CONTAINER_META_PATH;
data.setMetadataPath(metadataPath);
}
if (protoData.hasState()) {
data.setState(protoData.getState());
}
if (protoData.hasBytesUsed()) {
data.setBytesUsed(protoData.getBytesUsed());
}
return data;
}
}

View File

@ -220,7 +220,8 @@ ContainerCommandResponseProto handleCreateContainer(
long containerID = request.getContainerID();
KeyValueContainerData newContainerData = new KeyValueContainerData(
containerID, maxContainerSize);
containerID, maxContainerSize, request.getPipelineID(),
getDatanodeDetails().getUuidString());
// TODO: Add support to add metadataList to ContainerData. Add metadata
// to container during creation.
KeyValueContainer newContainer = new KeyValueContainer(
@ -772,13 +773,15 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
}
public Container importContainer(long containerID, long maxSize,
String originPipelineId,
String originNodeId,
FileInputStream rawContainerStream,
TarContainerPacker packer)
throws IOException {
KeyValueContainerData containerData =
new KeyValueContainerData(containerID,
maxSize);
maxSize, originPipelineId, originNodeId);
KeyValueContainer container = new KeyValueContainer(containerData,
conf);

View File

@ -105,11 +105,12 @@ public void closeContainer(final long containerId) throws IOException {
}
public Container importContainer(final ContainerType type,
final long containerId, final long maxSize,
final FileInputStream rawContainerStream, final TarContainerPacker packer)
final long containerId, final long maxSize, final String originPipelineId,
final String originNodeId, final FileInputStream rawContainerStream,
final TarContainerPacker packer)
throws IOException {
return handlers.get(type).importContainer(
containerId, maxSize, rawContainerStream, packer);
return handlers.get(type).importContainer(containerId, maxSize,
originPipelineId, originNodeId, rawContainerStream, packer);
}
/**

View File

@ -83,6 +83,8 @@ public void importContainer(long containerID, Path tarFilePath) {
originalContainerData.getContainerType(),
containerID,
originalContainerData.getMaxSize(),
originalContainerData.getOriginPipelineId(),
originalContainerData.getOriginNodeId(),
tempContainerTarStream,
packer);

View File

@ -157,6 +157,7 @@ message ContainerReplicaProto {
optional string finalhash = 10;
optional int64 deleteTransactionId = 11;
optional uint64 blockCommitSequenceId = 12;
optional string originNodeId = 13;
}
message CommandStatusReportsProto {

View File

@ -25,6 +25,7 @@
import static org.junit.Assert.assertEquals;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -43,9 +44,11 @@ public void testKeyValueData() {
ContainerProtos.ContainerDataProto.State state =
ContainerProtos.ContainerDataProto.State.CLOSED;
AtomicLong val = new AtomicLong(0);
UUID pipelineId = UUID.randomUUID();
UUID datanodeId = UUID.randomUUID();
KeyValueContainerData kvData = new KeyValueContainerData(containerId,
MAXSIZE);
MAXSIZE, pipelineId.toString(), datanodeId.toString());
assertEquals(containerType, kvData.getContainerType());
assertEquals(containerId, kvData.getContainerID());
@ -83,6 +86,8 @@ public void testKeyValueData() {
assertEquals(1, kvData.getWriteCount());
assertEquals(1, kvData.getKeyCount());
assertEquals(1, kvData.getNumPendingDeletionBlocks());
assertEquals(pipelineId.toString(), kvData.getOriginPipelineId());
assertEquals(datanodeId.toString(), kvData.getOriginNodeId());
}
}

View File

@ -29,6 +29,7 @@
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -55,7 +56,8 @@ private File createContainerFile(long containerID) throws IOException {
String containerPath = containerID + ".container";
KeyValueContainerData keyValueContainerData = new KeyValueContainerData(
containerID, MAXSIZE);
containerID, MAXSIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
keyValueContainerData.setContainerDBType("RocksDB");
keyValueContainerData.setMetadataPath(testRoot);
keyValueContainerData.setChunksPath(testRoot);

View File

@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -55,7 +56,8 @@ public void testAddGetRemoveContainer() throws StorageContainerException {
.ContainerDataProto.State.CLOSED;
KeyValueContainerData kvData = new KeyValueContainerData(containerId,
(long) StorageUnit.GB.toBytes(5));
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
UUID.randomUUID().toString());
kvData.setState(state);
KeyValueContainer keyValueContainer = new KeyValueContainer(kvData, new
OzoneConfiguration());
@ -166,7 +168,8 @@ private ContainerSet createContainerSet() throws StorageContainerException {
ContainerSet containerSet = new ContainerSet();
for (int i=0; i<10; i++) {
KeyValueContainerData kvData = new KeyValueContainerData(i,
(long) StorageUnit.GB.toBytes(5));
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
UUID.randomUUID().toString());
if (i%2 == 0) {
kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
} else {

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@ -76,9 +77,14 @@ public void testContainerCloseActionWhenFull() throws IOException {
DatanodeDetails dd = randomDatanodeDetails();
ContainerSet containerSet = new ContainerSet();
VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf);
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(dd);
Mockito.when(context.getParent()).thenReturn(stateMachine);
KeyValueContainerData containerData = new KeyValueContainerData(1L,
(long) StorageUnit.GB.toBytes(1));
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
dd.getUuidString());
Container container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
@ -87,8 +93,8 @@ public void testContainerCloseActionWhenFull() throws IOException {
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf, null, containerSet, volumeSet, metrics));
Handler.getHandlerForContainerType(containerType, conf, context,
containerSet, volumeSet, metrics));
}
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics);
@ -125,13 +131,17 @@ public void testCreateContainerWithWriteChunk() throws IOException {
DatanodeDetails dd = randomDatanodeDetails();
ContainerSet containerSet = new ContainerSet();
VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf);
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(dd);
Mockito.when(context.getParent()).thenReturn(stateMachine);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf, null, containerSet, volumeSet, metrics));
Handler.getHandlerForContainerType(containerType, conf, context,
containerSet, volumeSet, metrics));
}
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics);

View File

@ -20,10 +20,13 @@
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.junit.Assert;
@ -54,13 +57,19 @@ public void setup() throws Exception {
this.conf = new Configuration();
this.containerSet = Mockito.mock(ContainerSet.class);
this.volumeSet = Mockito.mock(VolumeSet.class);
DatanodeDetails datanodeDetails = Mockito.mock(DatanodeDetails.class);
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(stateMachine);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerProtos.ContainerType containerType :
ContainerProtos.ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf, null, containerSet, volumeSet, metrics));
containerType, conf, context, containerSet, volumeSet, metrics));
}
this.dispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, null, metrics);

View File

@ -51,7 +51,7 @@
*/
public class TestCloseContainerCommandHandler {
private static final StateContext CONTEXT = Mockito.mock(StateContext.class);
private final StateContext context = Mockito.mock(StateContext.class);
private static File testDir;
@ -62,7 +62,12 @@ private OzoneContainer getOzoneContainer(final OzoneConfiguration conf,
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testDir.getPath());
return new OzoneContainer(datanodeDetails, conf, CONTEXT);
final DatanodeStateMachine datanodeStateMachine = Mockito.mock(
DatanodeStateMachine.class);
Mockito.when(datanodeStateMachine.getDatanodeDetails())
.thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(datanodeStateMachine);
return new OzoneContainer(datanodeDetails, conf, context);
}
@ -106,20 +111,14 @@ public void testCloseContainerViaRatis()
new CloseContainerCommandHandler();
final CloseContainerCommand command = new CloseContainerCommand(
containerId.getId(), pipelineID);
final DatanodeStateMachine datanodeStateMachine = Mockito.mock(
DatanodeStateMachine.class);
Mockito.when(datanodeStateMachine.getDatanodeDetails())
.thenReturn(datanodeDetails);
Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine);
closeHandler.handle(command, container, CONTEXT, null);
closeHandler.handle(command, container, context, null);
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
container.getContainerSet().getContainer(
containerId.getId()).getContainerState());
Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat();
Mockito.verify(context.getParent(), Mockito.times(2)).triggerHeartbeat();
container.stop();
}
@ -164,20 +163,14 @@ public void testCloseContainerViaStandalone()
// Specify a pipeline which doesn't exist in the datanode.
final CloseContainerCommand command = new CloseContainerCommand(
containerId.getId(), PipelineID.randomId());
final DatanodeStateMachine datanodeStateMachine = Mockito.mock(
DatanodeStateMachine.class);
Mockito.when(datanodeStateMachine.getDatanodeDetails())
.thenReturn(datanodeDetails);
Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine);
closeHandler.handle(command, container, CONTEXT, null);
closeHandler.handle(command, container, context, null);
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED,
container.getContainerSet().getContainer(
containerId.getId()).getContainerState());
Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat();
Mockito.verify(context.getParent(), Mockito.times(2)).triggerHeartbeat();
container.stop();
}

View File

@ -68,9 +68,9 @@ public class TestBlockManagerImpl {
@Before
public void setUp() throws Exception {
config = new OzoneConfiguration();
UUID datanodeId = UUID.randomUUID();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
.getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
.getAbsolutePath()).conf(config).datanodeUuid(datanodeId
.toString()).build();
volumeSet = mock(VolumeSet.class);
@ -80,7 +80,8 @@ public void setUp() throws Exception {
.thenReturn(hddsVolume);
keyValueContainerData = new KeyValueContainerData(1L,
(long) StorageUnit.GB.toBytes(5));
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());
keyValueContainer = new KeyValueContainer(
keyValueContainerData, config);

View File

@ -72,8 +72,9 @@ public class TestChunkManagerImpl {
@Before
public void setUp() throws Exception {
config = new OzoneConfiguration();
UUID datanodeId = UUID.randomUUID();
hddsVolume = new HddsVolume.Builder(folder.getRoot()
.getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
.getAbsolutePath()).conf(config).datanodeUuid(datanodeId
.toString()).build();
volumeSet = mock(VolumeSet.class);
@ -83,7 +84,8 @@ public void setUp() throws Exception {
.thenReturn(hddsVolume);
keyValueContainerData = new KeyValueContainerData(1L,
(long) StorageUnit.GB.toBytes(5));
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());
keyValueContainer = new KeyValueContainer(keyValueContainerData, config);

View File

@ -246,7 +246,8 @@ private void createContainerWithBlocks(long containerId, int
normalBlocks, int deletedBlocks) throws
Exception {
containerData = new KeyValueContainerData(containerId,
(long) StorageUnit.GB.toBytes(1));
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
UUID.randomUUID().toString());
container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), UUID
.randomUUID().toString());

View File

@ -81,12 +81,14 @@ public class TestKeyValueContainer {
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
private KeyValueContainerData keyValueContainerData;
private KeyValueContainer keyValueContainer;
private UUID datanodeId;
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
datanodeId = UUID.randomUUID();
HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
.getAbsolutePath()).conf(conf).datanodeUuid(UUID.randomUUID()
.getAbsolutePath()).conf(conf).datanodeUuid(datanodeId
.toString()).build();
volumeSet = mock(VolumeSet.class);
@ -95,7 +97,8 @@ public void setUp() throws Exception {
.thenReturn(hddsVolume);
keyValueContainerData = new KeyValueContainerData(1L,
(long) StorageUnit.GB.toBytes(5));
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
datanodeId.toString());
keyValueContainer = new KeyValueContainer(
keyValueContainerData, conf);
@ -105,7 +108,8 @@ public void setUp() throws Exception {
@Test
public void testBlockIterator() throws Exception{
keyValueContainerData = new KeyValueContainerData(100L,
(long) StorageUnit.GB.toBytes(1));
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
datanodeId.toString());
keyValueContainer = new KeyValueContainer(
keyValueContainerData, conf);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
@ -214,7 +218,8 @@ public void testContainerImportExport() throws Exception {
//create a new one
KeyValueContainerData containerData =
new KeyValueContainerData(containerId, 1,
keyValueContainerData.getMaxSize());
keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
datanodeId.toString());
KeyValueContainer container = new KeyValueContainer(containerData, conf);
HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
@ -32,6 +33,8 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
@ -225,7 +228,14 @@ public void testVolumeSetInKeyValueHandler() throws Exception{
interval[0] = 2;
ContainerMetrics metrics = new ContainerMetrics(interval);
VolumeSet volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf);
KeyValueHandler keyValueHandler = new KeyValueHandler(conf, null, cset,
DatanodeDetails datanodeDetails = Mockito.mock(DatanodeDetails.class);
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails())
.thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(stateMachine);
KeyValueHandler keyValueHandler = new KeyValueHandler(conf, context, cset,
volumeSet, metrics);
assertEquals("org.apache.hadoop.ozone.container.common" +
".volume.RoundRobinVolumeChoosingPolicy",
@ -236,7 +246,7 @@ public void testVolumeSetInKeyValueHandler() throws Exception{
conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
"org.apache.hadoop.ozone.container.common.impl.HddsDispatcher");
try {
new KeyValueHandler(conf, null, cset, volumeSet, metrics);
new KeyValueHandler(conf, context, cset, volumeSet, metrics);
} catch (RuntimeException ex) {
GenericTestUtils.assertExceptionContains("class org.apache.hadoop" +
".ozone.container.common.impl.HddsDispatcher not org.apache" +
@ -266,7 +276,8 @@ public void testCloseInvalidContainer() throws IOException {
long containerID = 1234L;
Configuration conf = new Configuration();
KeyValueContainerData kvData = new KeyValueContainerData(containerID,
(long) StorageUnit.GB.toBytes(1));
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
UUID.randomUUID().toString());
KeyValueContainer container = new KeyValueContainer(kvData, conf);
kvData.setState(ContainerProtos.ContainerDataProto.State.INVALID);

View File

@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
@ -91,7 +92,8 @@ private KeyValueContainerData createContainer(long id, Path dir,
Files.createDirectories(dbDir);
Files.createDirectories(dataDir);
KeyValueContainerData containerData = new KeyValueContainerData(id, -1);
KeyValueContainerData containerData = new KeyValueContainerData(
id, -1, UUID.randomUUID().toString(), UUID.randomUUID().toString());
containerData.setChunksPath(dataDir.toString());
containerData.setMetadataPath(dbDir.getParent().toString());
containerData.setDbFile(dbDir.toFile());

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@ -34,6 +36,8 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.util.Random;
import java.util.UUID;
@ -79,16 +83,22 @@ public void testBuildContainerMap() throws Exception {
// Add containers to disk
for (int i=0; i<10; i++) {
keyValueContainerData = new KeyValueContainerData(i,
(long) StorageUnit.GB.toBytes(1));
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
datanodeDetails.getUuidString());
keyValueContainer = new KeyValueContainer(
keyValueContainerData, conf);
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
}
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(stateMachine);
// When OzoneContainer is started, the containers from disk should be
// loaded into the containerSet.
OzoneContainer ozoneContainer = new
OzoneContainer(datanodeDetails, conf, null);
OzoneContainer(datanodeDetails, conf, context);
ContainerSet containerset = ozoneContainer.getContainerSet();
assertEquals(10, containerset.containerCount());
}

View File

@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -118,7 +119,8 @@ private class FakeReplicator implements ContainerReplicator {
@Override
public void replicate(ReplicationTask task) {
KeyValueContainerData kvcd =
new KeyValueContainerData(task.getContainerId(), 100L);
new KeyValueContainerData(task.getContainerId(), 100L,
UUID.randomUUID().toString(), UUID.randomUUID().toString());
KeyValueContainer kvc =
new KeyValueContainer(kvcd, conf);
try {

View File

@ -6,7 +6,9 @@ containerType: KeyValueContainer
metadataPath: /hdds/current/aed-fg4-hji-jkl/containerDir0/1
layOutVersion: 1
maxSize: 5368709120
originPipelineId: 1297e8a9-2850-4ced-b96c-5ae31d2c73ad
originNodeId: 7f541a06-6c26-476d-9994-c6e1947e11cb
metadata: {OWNER: ozone, VOLUME: hdfs}
state: CLOSED
aclEnabled: true
checksum: c5b5373b8755c4e7199478dcaded9d996f9aca089704e08950259cdb0f290680
checksum: 61db56da7d50798561b5365c123c5fbf7faf99fbbbd571a746af79020b7f79ba

View File

@ -6,6 +6,8 @@ containerType: KeyValueContainer
metadataPath: /hdds/current/aed-fg4-hji-jkl/containerdir0/1
layOutVersion: 1
maxSize: 5368709120
originPipelineId: 4d41dd20-6d73-496a-b247-4c6cb483f54e
originNodeId: 54842560-67a5-48a5-a7d4-4701d9538706
metadata: {OWNER: ozone, VOLUME: hdfs}
state: OPEN
checksum: 08bc9d390f9183aeed3cf33c789e2a07310bba60f3cf55941caccc939db8670f

View File

@ -6,6 +6,8 @@ containerType: KeyValueContainer
metadataPath: /hdds/current/aed-fg4-hji-jkl/containerDir0/1
layOutVersion: 1
maxSize: 5368709120
originPipelineId: b2c96aa4-b757-4f97-b286-6fb80a1baf8e
originNodeId: 6dcfb385-caea-4efb-9ef3-f87fadca0f51
metadata: {OWNER: ozone, VOLUME: hdfs}
state: INVALID
checksum: 08bc9d390f9183aeed3cf33c789e2a07310bba60f3cf55941caccc939db8670f

View File

@ -154,8 +154,9 @@ public void testGetVersionTask() throws Exception {
OzoneConfiguration conf = SCMTestUtils.getConf();
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, 1000)) {
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(
TestUtils.randomDatanodeDetails(), conf, null);
datanodeDetails, conf, getContext(datanodeDetails));
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
@ -178,8 +179,9 @@ public void testCheckVersionResponse() throws Exception {
serverAddress, 1000)) {
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(VersionEndpointTask.LOG);
OzoneContainer ozoneContainer = new OzoneContainer(TestUtils
.randomDatanodeDetails(), conf, null);
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(
datanodeDetails, conf, getContext(datanodeDetails));
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
@ -231,8 +233,9 @@ public void testGetVersionToInvalidEndpoint() throws Exception {
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
nonExistentServerAddress, 1000)) {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(
TestUtils.randomDatanodeDetails(), conf, null);
datanodeDetails, conf, getContext(datanodeDetails));
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
EndpointStateMachine.EndPointStates newState = versionTask.call();
@ -258,8 +261,9 @@ public void testGetVersionAssertRpcTimeOut() throws Exception {
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, (int) rpcTimeout)) {
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
OzoneContainer ozoneContainer = new OzoneContainer(
TestUtils.randomDatanodeDetails(), conf, null);
datanodeDetails, conf, getContext(datanodeDetails));
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
conf, ozoneContainer);
@ -527,4 +531,13 @@ public void testHeartbeatTaskRpcTimeOut() throws Exception {
lessThanOrEqualTo(rpcTimeout + tolerance));
}
private StateContext getContext(DatanodeDetails datanodeDetails) {
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(stateMachine);
return context;
}
}

View File

@ -110,7 +110,8 @@ private void createToDeleteBlocks(ContainerSet containerSet,
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testRoot.getAbsolutePath());
long containerID = ContainerTestHelper.getTestContainerID();
KeyValueContainerData data = new KeyValueContainerData(containerID,
ContainerTestHelper.CONTAINER_MAX_SIZE);
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
Container container = new KeyValueContainer(data, conf);
container.create(new VolumeSet(scmId, clusterID, conf),
new RoundRobinVolumeChoosingPolicy(), scmId);

View File

@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
@ -72,7 +73,8 @@ public void testRandomChoosingPolicy() throws IOException {
int numContainers = 10;
for (int i = 0; i < numContainers; i++) {
KeyValueContainerData data = new KeyValueContainerData(i,
ContainerTestHelper.CONTAINER_MAX_SIZE);
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
KeyValueContainer container = new KeyValueContainer(data, conf);
containerSet.addContainer(container);
Assert.assertTrue(
@ -125,7 +127,9 @@ public void testTopNOrderedChoosingPolicy() throws IOException {
long containerId = RandomUtils.nextLong();
KeyValueContainerData data =
new KeyValueContainerData(containerId,
ContainerTestHelper.CONTAINER_MAX_SIZE);
ContainerTestHelper.CONTAINER_MAX_SIZE,
UUID.randomUUID().toString(),
UUID.randomUUID().toString());
if (i != numContainers) {
int deletionBlocks = random.nextInt(numContainers) + 1;
data.incrPendingDeletionBlocks(deletionBlocks);

View File

@ -157,7 +157,8 @@ private long getTestContainerID() {
private Container addContainer(ContainerSet cSet, long cID)
throws IOException {
KeyValueContainerData data = new KeyValueContainerData(cID,
ContainerTestHelper.CONTAINER_MAX_SIZE);
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
KeyValueContainer container = new KeyValueContainer(data, conf);

View File

@ -41,6 +41,8 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.hdds.scm.TestUtils;
@ -51,6 +53,7 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.util.Map;
@ -90,16 +93,22 @@ public void testContainerMetrics() throws Exception {
VolumeSet volumeSet = new VolumeSet(
datanodeDetails.getUuidString(), conf);
ContainerSet containerSet = new ContainerSet();
DatanodeStateMachine stateMachine = Mockito.mock(
DatanodeStateMachine.class);
StateContext context = Mockito.mock(StateContext.class);
Mockito.when(stateMachine.getDatanodeDetails())
.thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(stateMachine);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerProtos.ContainerType containerType :
ContainerProtos.ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(
containerType, conf, null, containerSet, volumeSet, metrics));
Handler.getHandlerForContainerType(containerType, conf, context,
containerSet, volumeSet, metrics));
}
HddsDispatcher dispatcher = new HddsDispatcher(conf, containerSet,
volumeSet, handlers, null, metrics);
volumeSet, handlers, context, metrics);
dispatcher.setScmId(UUID.randomUUID().toString());
server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,

View File

@ -29,11 +29,14 @@
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@ -72,8 +75,13 @@ public void testCreateOzoneContainer() throws Exception {
conf.setBoolean(
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
DatanodeDetails datanodeDetails = Mockito.mock(DatanodeDetails.class);
StateContext context = Mockito.mock(StateContext.class);
DatanodeStateMachine dsm = Mockito.mock(DatanodeStateMachine.class);
Mockito.when(dsm.getDatanodeDetails()).thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(dsm);
container = new OzoneContainer(TestUtils.randomDatanodeDetails(),
conf, null);
conf, context);
//Setting scmId, as we start manually ozone container.
container.getDispatcher().setScmId(UUID.randomUUID().toString());
container.start();