HDDS-382. Remove RatisTestHelper#RatisTestSuite constructor argument and fix checkstyle in ContainerTestHelper, GenericTestUtils
Contributed by Nandakumar.
This commit is contained in:
parent
33f42efc94
commit
c5629d546d
@ -38,7 +38,6 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
@ -188,13 +187,12 @@ public ContainerProtos.ContainerCommandResponseProto sendCommand(
|
|||||||
/**
|
/**
|
||||||
* Create a pipeline.
|
* Create a pipeline.
|
||||||
*
|
*
|
||||||
* @param pipeline - pipeline to be created.
|
* @param ignored - pipeline to be created.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void createPipeline(Pipeline pipeline)
|
public void createPipeline(Pipeline ignored)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// For stand alone pipeline, there is no notion called setup pipeline.
|
// For stand alone pipeline, there is no notion called setup pipeline.
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -38,7 +38,6 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
@ -218,13 +217,12 @@ private void reconnect() throws IOException {
|
|||||||
/**
|
/**
|
||||||
* Create a pipeline.
|
* Create a pipeline.
|
||||||
*
|
*
|
||||||
* @param pipeline - pipeline to be created.
|
* @param ignored - pipeline to be created.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void createPipeline(Pipeline pipeline)
|
public void createPipeline(Pipeline ignored)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// For stand alone pipeline, there is no notion called setup pipeline.
|
// For stand alone pipeline, there is no notion called setup pipeline.
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -154,7 +154,7 @@ public XceiverClientSpi call() throws Exception {
|
|||||||
break;
|
break;
|
||||||
case CHAINED:
|
case CHAINED:
|
||||||
default:
|
default:
|
||||||
throw new IOException ("not implemented" + pipeline.getType());
|
throw new IOException("not implemented" + pipeline.getType());
|
||||||
}
|
}
|
||||||
client.connect();
|
client.connect();
|
||||||
return client;
|
return client;
|
||||||
|
@ -65,7 +65,8 @@ public class ChunkInputStream extends InputStream implements Seekable {
|
|||||||
* @param chunks list of chunks to read
|
* @param chunks list of chunks to read
|
||||||
* @param traceID container protocol call traceID
|
* @param traceID container protocol call traceID
|
||||||
*/
|
*/
|
||||||
public ChunkInputStream(BlockID blockID, XceiverClientManager xceiverClientManager,
|
public ChunkInputStream(
|
||||||
|
BlockID blockID, XceiverClientManager xceiverClientManager,
|
||||||
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
|
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
|
||||||
this.blockID = blockID;
|
this.blockID = blockID;
|
||||||
this.traceID = traceID;
|
this.traceID = traceID;
|
||||||
@ -211,8 +212,8 @@ public synchronized void seek(long pos) throws IOException {
|
|||||||
if (pos < 0 || (chunks.size() == 0 && pos > 0)
|
if (pos < 0 || (chunks.size() == 0 && pos > 0)
|
||||||
|| pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
|
|| pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
|
||||||
.getLen()) {
|
.getLen()) {
|
||||||
throw new EOFException(
|
throw new EOFException("EOF encountered pos: " + pos + " container key: "
|
||||||
"EOF encountered pos: " + pos + " container key: " + blockID.getLocalID());
|
+ blockID.getLocalID());
|
||||||
}
|
}
|
||||||
if (chunkIndex == -1) {
|
if (chunkIndex == -1) {
|
||||||
chunkIndex = Arrays.binarySearch(chunkOffset, pos);
|
chunkIndex = Arrays.binarySearch(chunkOffset, pos);
|
||||||
|
@ -76,8 +76,8 @@ public class ChunkOutputStream extends OutputStream {
|
|||||||
* @param chunkSize chunk size
|
* @param chunkSize chunk size
|
||||||
*/
|
*/
|
||||||
public ChunkOutputStream(BlockID blockID, String key,
|
public ChunkOutputStream(BlockID blockID, String key,
|
||||||
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
|
XceiverClientManager xceiverClientManager,
|
||||||
String traceID, int chunkSize) {
|
XceiverClientSpi xceiverClient, String traceID, int chunkSize) {
|
||||||
this.blockID = blockID;
|
this.blockID = blockID;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.traceID = traceID;
|
this.traceID = traceID;
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* BlockID of ozone (containerID localID)
|
* BlockID of ozone (containerID localID).
|
||||||
*/
|
*/
|
||||||
public class BlockID {
|
public class BlockID {
|
||||||
private long containerID;
|
private long containerID;
|
||||||
@ -65,7 +65,8 @@ public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() {
|
|||||||
setContainerID(containerID).setLocalID(localID).build();
|
setContainerID(containerID).setLocalID(localID).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) {
|
public static BlockID getFromProtobuf(
|
||||||
|
ContainerProtos.DatanodeBlockID blockID) {
|
||||||
return new BlockID(blockID.getContainerID(),
|
return new BlockID(blockID.getContainerID(),
|
||||||
blockID.getLocalID());
|
blockID.getLocalID());
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ContainerCommandRequestProto;
|
.ContainerCommandRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
@ -29,7 +28,6 @@
|
|||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -43,8 +43,8 @@ public Builder setPipeline(Pipeline p) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setBlockID(BlockID blockID) {
|
public Builder setBlockID(BlockID blockId) {
|
||||||
this.blockID = blockID;
|
this.blockID = blockId;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -396,13 +396,13 @@ public static class Builder {
|
|||||||
private ReplicationType replicationType;
|
private ReplicationType replicationType;
|
||||||
|
|
||||||
public Builder setReplicationType(
|
public Builder setReplicationType(
|
||||||
ReplicationType replicationType) {
|
ReplicationType repType) {
|
||||||
this.replicationType = replicationType;
|
this.replicationType = repType;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setPipelineID(PipelineID pipelineID) {
|
public Builder setPipelineID(PipelineID pipelineId) {
|
||||||
this.pipelineID = pipelineID;
|
this.pipelineID = pipelineId;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -447,8 +447,8 @@ public Builder setOwner(String containerOwner) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setDeleteTransactionId(long deleteTransactionId) {
|
public Builder setDeleteTransactionId(long deleteTransactionID) {
|
||||||
this.deleteTransactionId = deleteTransactionId;
|
this.deleteTransactionId = deleteTransactionID;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,8 +26,8 @@
|
|||||||
/**
|
/**
|
||||||
* Class wraps ozone container info.
|
* Class wraps ozone container info.
|
||||||
*/
|
*/
|
||||||
public class ContainerWithPipeline
|
public class ContainerWithPipeline implements Comparator<ContainerWithPipeline>,
|
||||||
implements Comparator<ContainerWithPipeline>, Comparable<ContainerWithPipeline> {
|
Comparable<ContainerWithPipeline> {
|
||||||
|
|
||||||
private final ContainerInfo containerInfo;
|
private final ContainerInfo containerInfo;
|
||||||
private final Pipeline pipeline;
|
private final Pipeline pipeline;
|
||||||
@ -45,7 +45,8 @@ public Pipeline getPipeline() {
|
|||||||
return pipeline;
|
return pipeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ContainerWithPipeline fromProtobuf(HddsProtos.ContainerWithPipeline allocatedContainer) {
|
public static ContainerWithPipeline fromProtobuf(
|
||||||
|
HddsProtos.ContainerWithPipeline allocatedContainer) {
|
||||||
return new ContainerWithPipeline(
|
return new ContainerWithPipeline(
|
||||||
ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()),
|
ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()),
|
||||||
Pipeline.getFromProtoBuf(allocatedContainer.getPipeline()));
|
Pipeline.getFromProtoBuf(allocatedContainer.getPipeline()));
|
||||||
|
@ -169,8 +169,8 @@ public List<DatanodeDetails> getMachines() {
|
|||||||
*/
|
*/
|
||||||
public List<String> getDatanodeHosts() {
|
public List<String> getDatanodeHosts() {
|
||||||
List<String> dataHosts = new ArrayList<>();
|
List<String> dataHosts = new ArrayList<>();
|
||||||
for (DatanodeDetails id :getDatanodes().values()) {
|
for (DatanodeDetails datanode : getDatanodes().values()) {
|
||||||
dataHosts.add(id.getHostName());
|
dataHosts.add(datanode.getHostName());
|
||||||
}
|
}
|
||||||
return dataHosts;
|
return dataHosts;
|
||||||
}
|
}
|
||||||
@ -219,7 +219,7 @@ public HddsProtos.LifeCycleState getLifeCycleState() {
|
|||||||
* Update the State of the pipeline.
|
* Update the State of the pipeline.
|
||||||
*/
|
*/
|
||||||
public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
|
public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
|
||||||
lifeCycleState = nextState;
|
lifeCycleState = nextState;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -244,9 +244,8 @@ public HddsProtos.ReplicationType getType() {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
|
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
|
||||||
.append("[");
|
.append("[");
|
||||||
getDatanodes().keySet().stream()
|
getDatanodes().keySet().forEach(
|
||||||
.forEach(id -> b.
|
node -> b.append(node.endsWith(getLeaderID()) ? "*" + id : id));
|
||||||
append(id.endsWith(getLeaderID()) ? "*" + id : id));
|
|
||||||
b.append(" id:").append(id);
|
b.append(" id:").append(id);
|
||||||
if (getType() != null) {
|
if (getType() != null) {
|
||||||
b.append(" type:").append(getType().toString());
|
b.append(" type:").append(getType().toString());
|
||||||
|
@ -38,7 +38,8 @@ public interface StorageContainerLocationProtocol {
|
|||||||
* set of datanodes that should be used creating this container.
|
* set of datanodes that should be used creating this container.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType replicationType,
|
ContainerWithPipeline allocateContainer(
|
||||||
|
HddsProtos.ReplicationType replicationType,
|
||||||
HddsProtos.ReplicationFactor factor, String owner)
|
HddsProtos.ReplicationFactor factor, String owner)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
@ -61,7 +62,8 @@ ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType replicationTy
|
|||||||
* @return ContainerWithPipeline - the container info with the pipeline.
|
* @return ContainerWithPipeline - the container info with the pipeline.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException;
|
ContainerWithPipeline getContainerWithPipeline(long containerID)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ask SCM a list of containers with a range of container names
|
* Ask SCM a list of containers with a range of container names
|
||||||
|
@ -97,8 +97,9 @@ public StorageContainerLocationProtocolClientSideTranslatorPB(
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
|
public ContainerWithPipeline allocateContainer(
|
||||||
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
|
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
|
||||||
|
String owner) throws IOException {
|
||||||
|
|
||||||
ContainerRequestProto request = ContainerRequestProto.newBuilder()
|
ContainerRequestProto request = ContainerRequestProto.newBuilder()
|
||||||
.setReplicationFactor(factor)
|
.setReplicationFactor(factor)
|
||||||
@ -116,7 +117,8 @@ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type,
|
|||||||
throw new IOException(response.hasErrorMessage() ?
|
throw new IOException(response.hasErrorMessage() ?
|
||||||
response.getErrorMessage() : "Allocate container failed.");
|
response.getErrorMessage() : "Allocate container failed.");
|
||||||
}
|
}
|
||||||
return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
|
return ContainerWithPipeline.fromProtobuf(
|
||||||
|
response.getContainerWithPipeline());
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainerInfo getContainer(long containerID) throws IOException {
|
public ContainerInfo getContainer(long containerID) throws IOException {
|
||||||
@ -138,17 +140,18 @@ public ContainerInfo getContainer(long containerID) throws IOException {
|
|||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
|
public ContainerWithPipeline getContainerWithPipeline(long containerID)
|
||||||
|
throws IOException {
|
||||||
Preconditions.checkState(containerID >= 0,
|
Preconditions.checkState(containerID >= 0,
|
||||||
"Container ID cannot be negative");
|
"Container ID cannot be negative");
|
||||||
GetContainerWithPipelineRequestProto request = GetContainerWithPipelineRequestProto
|
GetContainerWithPipelineRequestProto request =
|
||||||
.newBuilder()
|
GetContainerWithPipelineRequestProto.newBuilder()
|
||||||
.setContainerID(containerID)
|
.setContainerID(containerID).build();
|
||||||
.build();
|
|
||||||
try {
|
try {
|
||||||
GetContainerWithPipelineResponseProto response =
|
GetContainerWithPipelineResponseProto response =
|
||||||
rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request);
|
rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request);
|
||||||
return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
|
return ContainerWithPipeline.fromProtobuf(
|
||||||
|
response.getContainerWithPipeline());
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
|
@ -113,8 +113,8 @@ public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
|
|||||||
* @throws IOException if there is an I/O error while performing the call
|
* @throws IOException if there is an I/O error while performing the call
|
||||||
*/
|
*/
|
||||||
public static ContainerProtos.GetCommittedBlockLengthResponseProto
|
public static ContainerProtos.GetCommittedBlockLengthResponseProto
|
||||||
getCommittedBlockLength(
|
getCommittedBlockLength(
|
||||||
XceiverClientSpi xceiverClient, BlockID blockID, String traceID)
|
XceiverClientSpi xceiverClient, BlockID blockID, String traceID)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ContainerProtos.GetCommittedBlockLengthRequestProto.Builder
|
ContainerProtos.GetCommittedBlockLengthRequestProto.Builder
|
||||||
getBlockLengthRequestBuilder =
|
getBlockLengthRequestBuilder =
|
||||||
@ -375,7 +375,7 @@ public static ReadContainerResponseProto readContainer(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads the data given the blockID
|
* Reads the data given the blockID.
|
||||||
*
|
*
|
||||||
* @param client
|
* @param client
|
||||||
* @param blockID - ID of the block
|
* @param blockID - ID of the block
|
||||||
|
@ -110,11 +110,6 @@ public static Versioning getVersioning(boolean versioning) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Ozone handler types.
|
|
||||||
*/
|
|
||||||
public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
|
|
||||||
|
|
||||||
public static final String DELETING_KEY_PREFIX = "#deleting#";
|
public static final String DELETING_KEY_PREFIX = "#deleting#";
|
||||||
public static final String DELETED_KEY_PREFIX = "#deleted#";
|
public static final String DELETED_KEY_PREFIX = "#deleted#";
|
||||||
public static final String DELETE_TRANSACTION_KEY_PREFIX = "#delTX#";
|
public static final String DELETE_TRANSACTION_KEY_PREFIX = "#delTX#";
|
||||||
|
@ -37,7 +37,8 @@ public class KeyData {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Represent a list of chunks.
|
* Represent a list of chunks.
|
||||||
* In order to reduce memory usage, chunkList is declared as an {@link Object}.
|
* In order to reduce memory usage, chunkList is declared as an
|
||||||
|
* {@link Object}.
|
||||||
* When #elements == 0, chunkList is null.
|
* When #elements == 0, chunkList is null.
|
||||||
* When #elements == 1, chunkList refers to the only element.
|
* When #elements == 1, chunkList refers to the only element.
|
||||||
* When #elements > 1, chunkList refers to the list.
|
* When #elements > 1, chunkList refers to the list.
|
||||||
@ -157,7 +158,7 @@ public List<ContainerProtos.ChunkInfo> getChunks() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds chinkInfo to the list
|
* Adds chinkInfo to the list.
|
||||||
*/
|
*/
|
||||||
public void addChunk(ContainerProtos.ChunkInfo chunkInfo) {
|
public void addChunk(ContainerProtos.ChunkInfo chunkInfo) {
|
||||||
if (chunkList == null) {
|
if (chunkList == null) {
|
||||||
@ -237,7 +238,8 @@ public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
|
|||||||
} else {
|
} else {
|
||||||
final int n = chunks.size();
|
final int n = chunks.size();
|
||||||
chunkList = n == 0? null: n == 1? chunks.get(0): chunks;
|
chunkList = n == 0? null: n == 1? chunks.get(0): chunks;
|
||||||
size = chunks.parallelStream().mapToLong(ContainerProtos.ChunkInfo::getLen).sum();
|
size = chunks.parallelStream().mapToLong(
|
||||||
|
ContainerProtos.ChunkInfo::getLen).sum();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,7 +36,8 @@
|
|||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class HddsVersionInfo {
|
public class HddsVersionInfo {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(HddsVersionInfo.class);
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
HddsVersionInfo.class);
|
||||||
|
|
||||||
private Properties info;
|
private Properties info;
|
||||||
|
|
||||||
@ -95,7 +96,8 @@ protected String _getProtocVersion() {
|
|||||||
return info.getProperty("protocVersion", "Unknown");
|
return info.getProperty("protocVersion", "Unknown");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HddsVersionInfo HDDS_VERSION_INFO = new HddsVersionInfo("hdds");
|
private static final HddsVersionInfo HDDS_VERSION_INFO =
|
||||||
|
new HddsVersionInfo("hdds");
|
||||||
/**
|
/**
|
||||||
* Get the HDDS version.
|
* Get the HDDS version.
|
||||||
* @return the Hdds version string, eg. "0.6.3-dev"
|
* @return the Hdds version string, eg. "0.6.3-dev"
|
||||||
|
@ -55,7 +55,6 @@
|
|||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -232,7 +232,8 @@ public static HddsDatanodeService createHddsDatanodeService(
|
|||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
try {
|
try {
|
||||||
if (DFSUtil.parseHelpArgument(args, "Starts HDDS Datanode", System.out, false)) {
|
if (DFSUtil.parseHelpArgument(
|
||||||
|
args, "Starts HDDS Datanode", System.out, false)) {
|
||||||
System.exit(0);
|
System.exit(0);
|
||||||
}
|
}
|
||||||
Configuration conf = new OzoneConfiguration();
|
Configuration conf = new OzoneConfiguration();
|
||||||
|
@ -43,7 +43,6 @@
|
|||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import org.yaml.snakeyaml.Yaml;
|
import org.yaml.snakeyaml.Yaml;
|
||||||
|
|
||||||
@ -54,8 +53,6 @@
|
|||||||
.Result.NO_SUCH_ALGORITHM;
|
.Result.NO_SUCH_ALGORITHM;
|
||||||
import static org.apache.hadoop.ozone.container.common.impl.ContainerData
|
import static org.apache.hadoop.ozone.container.common.impl.ContainerData
|
||||||
.CHARSET_ENCODING;
|
.CHARSET_ENCODING;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A set of helper functions to create proper responses.
|
* A set of helper functions to create proper responses.
|
||||||
@ -75,14 +72,13 @@ private ContainerUtils() {
|
|||||||
* @return ContainerCommand Response Builder.
|
* @return ContainerCommand Response Builder.
|
||||||
*/
|
*/
|
||||||
public static ContainerCommandResponseProto.Builder
|
public static ContainerCommandResponseProto.Builder
|
||||||
getContainerCommandResponse(
|
getContainerCommandResponse(
|
||||||
ContainerCommandRequestProto request, Result result, String message) {
|
ContainerCommandRequestProto request, Result result, String message) {
|
||||||
return
|
return ContainerCommandResponseProto.newBuilder()
|
||||||
ContainerCommandResponseProto.newBuilder()
|
.setCmdType(request.getCmdType())
|
||||||
.setCmdType(request.getCmdType())
|
.setTraceID(request.getTraceID())
|
||||||
.setTraceID(request.getTraceID())
|
.setResult(result)
|
||||||
.setResult(result)
|
.setMessage(message);
|
||||||
.setMessage(message);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -287,7 +283,7 @@ public static String getChecksum(String containerDataYamlStr)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the .container file from the containerBaseDir
|
* Get the .container file from the containerBaseDir.
|
||||||
* @param containerBaseDir container base directory. The name of this
|
* @param containerBaseDir container base directory. The name of this
|
||||||
* directory is same as the containerID
|
* directory is same as the containerID
|
||||||
* @return the .container file
|
* @return the .container file
|
||||||
@ -301,7 +297,7 @@ public static File getContainerFile(File containerBaseDir) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ContainerID can be decoded from the container base directory name
|
* ContainerID can be decoded from the container base directory name.
|
||||||
*/
|
*/
|
||||||
public static long getContainerID(File containerBaseDir) {
|
public static long getContainerID(File containerBaseDir) {
|
||||||
return Long.parseLong(containerBaseDir.getName());
|
return Long.parseLong(containerBaseDir.getName());
|
||||||
|
@ -132,7 +132,7 @@ public Iterator<Map.Entry<Long, Container>> getContainerMapIterator() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a copy of the containerMap
|
* Return a copy of the containerMap.
|
||||||
* @return containerMap
|
* @return containerMap
|
||||||
*/
|
*/
|
||||||
public Map<Long, Container> getContainerMap() {
|
public Map<Long, Container> getContainerMap() {
|
||||||
|
@ -45,10 +45,12 @@ public class OpenContainerBlockMap {
|
|||||||
/**
|
/**
|
||||||
* Map: localId -> KeyData.
|
* Map: localId -> KeyData.
|
||||||
*
|
*
|
||||||
* In order to support {@link #getAll()}, the update operations are synchronized.
|
* In order to support {@link #getAll()}, the update operations are
|
||||||
|
* synchronized.
|
||||||
*/
|
*/
|
||||||
static class KeyDataMap {
|
static class KeyDataMap {
|
||||||
private final ConcurrentMap<Long, KeyData> blocks = new ConcurrentHashMap<>();
|
private final ConcurrentMap<Long, KeyData> blocks =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
KeyData get(long localId) {
|
KeyData get(long localId) {
|
||||||
return blocks.get(localId);
|
return blocks.get(localId);
|
||||||
@ -59,7 +61,8 @@ synchronized int removeAndGetSize(long localId) {
|
|||||||
return blocks.size();
|
return blocks.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized KeyData computeIfAbsent(long localId, Function<Long, KeyData> f) {
|
synchronized KeyData computeIfAbsent(
|
||||||
|
long localId, Function<Long, KeyData> f) {
|
||||||
return blocks.computeIfAbsent(localId, f);
|
return blocks.computeIfAbsent(localId, f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,7 +79,8 @@ synchronized List<KeyData> getAll() {
|
|||||||
*
|
*
|
||||||
* For now, we will track all open blocks of a container in the blockMap.
|
* For now, we will track all open blocks of a container in the blockMap.
|
||||||
*/
|
*/
|
||||||
private final ConcurrentMap<Long, KeyDataMap> containers = new ConcurrentHashMap<>();
|
private final ConcurrentMap<Long, KeyDataMap> containers =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes the Container matching with specified containerId.
|
* Removes the Container matching with specified containerId.
|
||||||
@ -109,7 +113,7 @@ public void removeChunk(BlockID blockID, ChunkInfo chunkInfo) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* returns the list of open to the openContainerBlockMap
|
* Returns the list of open to the openContainerBlockMap.
|
||||||
* @param containerId container id
|
* @param containerId container id
|
||||||
* @return List of open Keys(blocks)
|
* @return List of open Keys(blocks)
|
||||||
*/
|
*/
|
||||||
@ -130,15 +134,14 @@ public void removeFromKeyMap(BlockID blockID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the block exists in the map, false otherwise
|
* Returns true if the block exists in the map, false otherwise.
|
||||||
*
|
*
|
||||||
* @param blockID
|
* @param blockID
|
||||||
* @return True, if it exists, false otherwise
|
* @return True, if it exists, false otherwise
|
||||||
*/
|
*/
|
||||||
public boolean checkIfBlockExists(BlockID blockID) {
|
public boolean checkIfBlockExists(BlockID blockID) {
|
||||||
KeyDataMap keyDataMap = containers.get(blockID.getContainerID());
|
KeyDataMap keyDataMap = containers.get(blockID.getContainerID());
|
||||||
return keyDataMap == null ? false :
|
return keyDataMap != null && keyDataMap.get(blockID.getLocalID()) != null;
|
||||||
keyDataMap.get(blockID.getLocalID()) != null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -71,10 +71,10 @@
|
|||||||
*/
|
*/
|
||||||
public final class XceiverServerRatis implements XceiverServerSpi {
|
public final class XceiverServerRatis implements XceiverServerSpi {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
|
static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
|
||||||
private static final AtomicLong callIdCounter = new AtomicLong();
|
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
|
||||||
|
|
||||||
private static long nextCallId() {
|
private static long nextCallId() {
|
||||||
return callIdCounter.getAndIncrement() & Long.MAX_VALUE;
|
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final int port;
|
private final int port;
|
||||||
@ -307,6 +307,6 @@ private RaftClientRequest createRaftClientRequest(
|
|||||||
RaftClientRequest.Type type) {
|
RaftClientRequest.Type type) {
|
||||||
return new RaftClientRequest(clientId, server.getId(),
|
return new RaftClientRequest(clientId, server.getId(),
|
||||||
PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(),
|
PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(),
|
||||||
nextCallId(),0, Message.valueOf(request.toByteString()), type);
|
nextCallId(), 0, Message.valueOf(request.toByteString()), type);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -67,8 +67,8 @@ public interface KeyManager {
|
|||||||
* @param count - Number of keys to return.
|
* @param count - Number of keys to return.
|
||||||
* @return List of Keys that match the criteria.
|
* @return List of Keys that match the criteria.
|
||||||
*/
|
*/
|
||||||
List<KeyData> listKey(Container container, long startLocalID, int count) throws
|
List<KeyData> listKey(Container container, long startLocalID, int count)
|
||||||
IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the last committed block length for the block.
|
* Returns the last committed block length for the block.
|
||||||
|
@ -109,23 +109,23 @@ public static CommandStatusBuilder newBuilder() {
|
|||||||
return new CommandStatusBuilder();
|
return new CommandStatusBuilder();
|
||||||
}
|
}
|
||||||
|
|
||||||
public CommandStatusBuilder setType(Type type) {
|
public CommandStatusBuilder setType(Type commandType) {
|
||||||
this.type = type;
|
this.type = commandType;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CommandStatusBuilder setCmdId(Long cmdId) {
|
public CommandStatusBuilder setCmdId(Long commandId) {
|
||||||
this.cmdId = cmdId;
|
this.cmdId = commandId;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CommandStatusBuilder setStatus(Status status) {
|
public CommandStatusBuilder setStatus(Status commandStatus) {
|
||||||
this.status = status;
|
this.status = commandStatus;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CommandStatusBuilder setMsg(String msg) {
|
public CommandStatusBuilder setMsg(String message) {
|
||||||
this.msg = msg;
|
this.msg = message;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,11 +193,13 @@ private void sleepIfNeeded() {
|
|||||||
rpcCount.incrementAndGet();
|
rpcCount.incrementAndGet();
|
||||||
heartbeatCount.incrementAndGet();
|
heartbeatCount.incrementAndGet();
|
||||||
if(heartbeat.hasCommandStatusReport()){
|
if(heartbeat.hasCommandStatusReport()){
|
||||||
cmdStatusList.addAll(heartbeat.getCommandStatusReport().getCmdStatusList());
|
cmdStatusList.addAll(heartbeat.getCommandStatusReport()
|
||||||
|
.getCmdStatusList());
|
||||||
commandStatusReport.incrementAndGet();
|
commandStatusReport.incrementAndGet();
|
||||||
}
|
}
|
||||||
sleepIfNeeded();
|
sleepIfNeeded();
|
||||||
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(scmCommandRequests)
|
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(
|
||||||
|
scmCommandRequests)
|
||||||
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
|
.setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
@ -19,17 +19,12 @@
|
|||||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
|
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
|
||||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -37,8 +32,6 @@
|
|||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests Handler interface.
|
* Tests Handler interface.
|
||||||
*/
|
*/
|
||||||
|
@ -21,8 +21,6 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
|
||||||
.StorageContainerDatanodeProtocolProtos.ContainerInfo;
|
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
import org.apache.hadoop.hdds.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.ContainerAction;
|
.StorageContainerDatanodeProtocolProtos.ContainerAction;
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
import org.apache.hadoop.hdds.protocol.proto
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.GetSpaceUsed;
|
import org.apache.hadoop.fs.GetSpaceUsed;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
@ -43,8 +42,8 @@ public class TestRoundRobinVolumeChoosingPolicy {
|
|||||||
private List<HddsVolume> volumes;
|
private List<HddsVolume> volumes;
|
||||||
|
|
||||||
private final String baseDir = MiniDFSCluster.getBaseDirectory();
|
private final String baseDir = MiniDFSCluster.getBaseDirectory();
|
||||||
private final String volume1 = baseDir + "disk1";
|
private final String volume1 = baseDir + "disk1";
|
||||||
private final String volume2 = baseDir + "disk2";
|
private final String volume2 = baseDir + "disk2";
|
||||||
private static final String DUMMY_IP_ADDR = "0.0.0.0";
|
private static final String DUMMY_IP_ADDR = "0.0.0.0";
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -62,7 +62,8 @@ public void setUp() throws Exception {
|
|||||||
conf = new OzoneConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, folder.getRoot()
|
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, folder.getRoot()
|
||||||
.getAbsolutePath());
|
.getAbsolutePath());
|
||||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().getAbsolutePath());
|
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
|
||||||
|
folder.newFolder().getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -180,9 +180,11 @@ public synchronized boolean remove(TIMEOUT_PAYLOAD payload) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
|
protected abstract void onTimeout(
|
||||||
|
EventPublisher publisher, TIMEOUT_PAYLOAD payload);
|
||||||
|
|
||||||
protected abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
|
protected abstract void onFinished(
|
||||||
|
EventPublisher publisher, TIMEOUT_PAYLOAD payload);
|
||||||
|
|
||||||
public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
|
public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
|
||||||
Predicate<? super TIMEOUT_PAYLOAD> predicate) {
|
Predicate<? super TIMEOUT_PAYLOAD> predicate) {
|
||||||
|
@ -22,9 +22,6 @@
|
|||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -161,11 +161,11 @@ private void preAllocateContainers(int count, ReplicationType type,
|
|||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
ContainerWithPipeline containerWithPipeline = null;
|
ContainerWithPipeline containerWithPipeline;
|
||||||
try {
|
try {
|
||||||
// TODO: Fix this later when Ratis is made the Default.
|
// TODO: Fix this later when Ratis is made the Default.
|
||||||
containerWithPipeline = containerManager.allocateContainer(type, factor,
|
containerWithPipeline = containerManager.allocateContainer(
|
||||||
owner);
|
type, factor, owner);
|
||||||
|
|
||||||
if (containerWithPipeline == null) {
|
if (containerWithPipeline == null) {
|
||||||
LOG.warn("Unable to allocate container.");
|
LOG.warn("Unable to allocate container.");
|
||||||
@ -293,12 +293,12 @@ public AllocatedBlock allocateBlock(final long size,
|
|||||||
|
|
||||||
private String getChannelName(ReplicationType type) {
|
private String getChannelName(ReplicationType type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case RATIS:
|
case RATIS:
|
||||||
return "RA" + UUID.randomUUID().toString().substring(3);
|
return "RA" + UUID.randomUUID().toString().substring(3);
|
||||||
case STAND_ALONE:
|
case STAND_ALONE:
|
||||||
return "SA" + UUID.randomUUID().toString().substring(3);
|
return "SA" + UUID.randomUUID().toString().substring(3);
|
||||||
default:
|
default:
|
||||||
return "RA" + UUID.randomUUID().toString().substring(3);
|
return "RA" + UUID.randomUUID().toString().substring(3);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,7 +232,8 @@ public void commitTransactions(
|
|||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
Set<UUID> dnsWithCommittedTxn;
|
Set<UUID> dnsWithCommittedTxn;
|
||||||
for (DeleteBlockTransactionResult transactionResult : transactionResults) {
|
for (DeleteBlockTransactionResult transactionResult :
|
||||||
|
transactionResults) {
|
||||||
if (isTransactionFailed(transactionResult)) {
|
if (isTransactionFailed(transactionResult)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -109,8 +109,8 @@ public BackgroundTaskQueue getTasks() {
|
|||||||
|
|
||||||
public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) {
|
public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) {
|
||||||
DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails();
|
DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails();
|
||||||
for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus : deletionStatusList
|
for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus :
|
||||||
.getPendingDeleteStatuses()) {
|
deletionStatusList.getPendingDeleteStatuses()) {
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Block deletion txnID mismatch in datanode {} for containerID {}."
|
"Block deletion txnID mismatch in datanode {} for containerID {}."
|
||||||
+ " Datanode delete txnID: {}, SCM txnID: {}",
|
+ " Datanode delete txnID: {}, SCM txnID: {}",
|
||||||
|
@ -62,8 +62,8 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) {
|
|||||||
containerManager.getContainerWithPipeline(containerID.getId());
|
containerManager.getContainerWithPipeline(containerID.getId());
|
||||||
info = containerWithPipeline.getContainerInfo();
|
info = containerWithPipeline.getContainerInfo();
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
LOG.error("Failed to update the container state. Container with id : {} "
|
LOG.error("Failed to update the container state. Container with id : {}"
|
||||||
+ "does not exist", containerID.getId());
|
+ " does not exist", containerID.getId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -600,8 +600,8 @@ private HddsProtos.SCMContainerInfo reconcileState(
|
|||||||
.setReplicationType(knownState.getReplicationType())
|
.setReplicationType(knownState.getReplicationType())
|
||||||
.setReplicationFactor(knownState.getReplicationFactor());
|
.setReplicationFactor(knownState.getReplicationFactor());
|
||||||
|
|
||||||
// TODO: If current state doesn't have this DN in list of DataNodes with replica
|
// TODO: If current state doesn't have this DN in list of DataNodes with
|
||||||
// then add it in list of replicas.
|
// replica then add it in list of replicas.
|
||||||
|
|
||||||
// If used size is greater than allocated size, we will be updating
|
// If used size is greater than allocated size, we will be updating
|
||||||
// allocated size with used size. This update is done as a fallback
|
// allocated size with used size. This update is done as a fallback
|
||||||
|
@ -288,9 +288,10 @@ private void initializeStateMachine() {
|
|||||||
* @return ContainerWithPipeline
|
* @return ContainerWithPipeline
|
||||||
* @throws IOException on Failure.
|
* @throws IOException on Failure.
|
||||||
*/
|
*/
|
||||||
public ContainerWithPipeline allocateContainer(PipelineSelector selector, HddsProtos
|
public ContainerWithPipeline allocateContainer(PipelineSelector selector,
|
||||||
.ReplicationType type, HddsProtos.ReplicationFactor replicationFactor,
|
HddsProtos.ReplicationType type,
|
||||||
String owner) throws IOException {
|
HddsProtos.ReplicationFactor replicationFactor, String owner)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
Pipeline pipeline = selector.getReplicationPipeline(type,
|
Pipeline pipeline = selector.getReplicationPipeline(type,
|
||||||
replicationFactor);
|
replicationFactor);
|
||||||
|
@ -182,7 +182,7 @@ public void stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Event for the ReplicationCommandWatcher to repeate the embedded request
|
* Event for the ReplicationCommandWatcher to repeate the embedded request.
|
||||||
* in case fof timeout.
|
* in case fof timeout.
|
||||||
*/
|
*/
|
||||||
public static class ReplicationRequestToRepeat
|
public static class ReplicationRequestToRepeat
|
||||||
|
@ -381,7 +381,7 @@ NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns Open containers in the SCM by the Pipeline
|
* Returns Open containers in the SCM by the Pipeline.
|
||||||
*
|
*
|
||||||
* @param pipelineID - Pipeline id.
|
* @param pipelineID - Pipeline id.
|
||||||
* @return NavigableSet<ContainerID>
|
* @return NavigableSet<ContainerID>
|
||||||
|
@ -84,8 +84,8 @@ public void insertNewDatanode(UUID datanodeID, Set<ContainerID> containerIDs)
|
|||||||
* @throws SCMException - if we don't know about this datanode, for new DN
|
* @throws SCMException - if we don't know about this datanode, for new DN
|
||||||
* use insertNewDatanode.
|
* use insertNewDatanode.
|
||||||
*/
|
*/
|
||||||
public void setContainersForDatanode(UUID datanodeID, Set<ContainerID> containers)
|
public void setContainersForDatanode(UUID datanodeID,
|
||||||
throws SCMException {
|
Set<ContainerID> containers) throws SCMException {
|
||||||
Preconditions.checkNotNull(datanodeID);
|
Preconditions.checkNotNull(datanodeID);
|
||||||
Preconditions.checkNotNull(containers);
|
Preconditions.checkNotNull(containers);
|
||||||
if (dn2ContainerMap
|
if (dn2ContainerMap
|
||||||
|
@ -17,8 +17,6 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hdds.scm.node.states;
|
package org.apache.hadoop.hdds.scm.node.states;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This exception represents that the node that is being accessed does not
|
* This exception represents that the node that is being accessed does not
|
||||||
* exist in NodeStateMap.
|
* exist in NodeStateMap.
|
||||||
|
@ -94,7 +94,8 @@ ReportResult build() {
|
|||||||
if (nullSafeMissingContainers == null) {
|
if (nullSafeMissingContainers == null) {
|
||||||
nullSafeMissingContainers = Collections.emptySet();
|
nullSafeMissingContainers = Collections.emptySet();
|
||||||
}
|
}
|
||||||
return new ReportResult(status, nullSafeMissingContainers, nullSafeNewContainers);
|
return new ReportResult(status, nullSafeMissingContainers,
|
||||||
|
nullSafeNewContainers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,39 +23,31 @@
|
|||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Collections;
|
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
|
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
|
||||||
.DUPLICATE_DATANODE;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This data structure maintains the list of pipelines which the given datanode
|
* This data structure maintains the list of pipelines which the given datanode is a part of. This
|
||||||
* is a part of.
|
* information will be added whenever a new pipeline allocation happens.
|
||||||
* This information will be added whenever a new pipeline allocation happens.
|
|
||||||
*
|
*
|
||||||
* TODO: this information needs to be regenerated from pipeline reports on
|
* <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart
|
||||||
* SCM restart
|
|
||||||
*/
|
*/
|
||||||
public class Node2PipelineMap {
|
public class Node2PipelineMap {
|
||||||
private final Map<UUID, Set<Pipeline>> dn2PipelineMap;
|
private final Map<UUID, Set<Pipeline>> dn2PipelineMap;
|
||||||
|
|
||||||
/**
|
/** Constructs a Node2PipelineMap Object. */
|
||||||
* Constructs a Node2PipelineMap Object.
|
|
||||||
*/
|
|
||||||
public Node2PipelineMap() {
|
public Node2PipelineMap() {
|
||||||
dn2PipelineMap = new ConcurrentHashMap<>();
|
dn2PipelineMap = new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if this a datanode that is already tracked by
|
* Returns true if this a datanode that is already tracked by Node2PipelineMap.
|
||||||
* Node2PipelineMap.
|
|
||||||
*
|
*
|
||||||
* @param datanodeID - UUID of the Datanode.
|
* @param datanodeID - UUID of the Datanode.
|
||||||
* @return True if this is tracked, false if this map does not know about it.
|
* @return True if this is tracked, false if this map does not know about it.
|
||||||
@ -71,18 +63,17 @@ private boolean isKnownDatanode(UUID datanodeID) {
|
|||||||
* @param datanodeID -- Datanode UUID
|
* @param datanodeID -- Datanode UUID
|
||||||
* @param pipelines - set of pipelines.
|
* @param pipelines - set of pipelines.
|
||||||
*/
|
*/
|
||||||
private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines)
|
private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines) throws SCMException {
|
||||||
throws SCMException {
|
|
||||||
Preconditions.checkNotNull(pipelines);
|
Preconditions.checkNotNull(pipelines);
|
||||||
Preconditions.checkNotNull(datanodeID);
|
Preconditions.checkNotNull(datanodeID);
|
||||||
if(dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) {
|
if (dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) {
|
||||||
throw new SCMException("Node already exists in the map",
|
throw new SCMException("Node already exists in the map", DUPLICATE_DATANODE);
|
||||||
DUPLICATE_DATANODE);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes datanode Entry from the map.
|
* Removes datanode Entry from the map.
|
||||||
|
*
|
||||||
* @param datanodeID - Datanode ID.
|
* @param datanodeID - Datanode ID.
|
||||||
*/
|
*/
|
||||||
public synchronized void removeDatanode(UUID datanodeID) {
|
public synchronized void removeDatanode(UUID datanodeID) {
|
||||||
@ -98,20 +89,19 @@ public synchronized void removeDatanode(UUID datanodeID) {
|
|||||||
*/
|
*/
|
||||||
public Set<Pipeline> getPipelines(UUID datanode) {
|
public Set<Pipeline> getPipelines(UUID datanode) {
|
||||||
Preconditions.checkNotNull(datanode);
|
Preconditions.checkNotNull(datanode);
|
||||||
return dn2PipelineMap.computeIfPresent(datanode, (k, v) ->
|
return dn2PipelineMap.computeIfPresent(datanode, (k, v) -> Collections.unmodifiableSet(v));
|
||||||
Collections.unmodifiableSet(v));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a pipeline entry to a given dataNode in the map.
|
* Adds a pipeline entry to a given dataNode in the map.
|
||||||
|
*
|
||||||
* @param pipeline Pipeline to be added
|
* @param pipeline Pipeline to be added
|
||||||
*/
|
*/
|
||||||
public synchronized void addPipeline(Pipeline pipeline) {
|
public synchronized void addPipeline(Pipeline pipeline) {
|
||||||
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
|
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
|
||||||
UUID dnId = details.getUuid();
|
UUID dnId = details.getUuid();
|
||||||
dn2PipelineMap
|
dn2PipelineMap
|
||||||
.computeIfAbsent(dnId,
|
.computeIfAbsent(dnId, k -> Collections.synchronizedSet(new HashSet<>()))
|
||||||
k -> Collections.synchronizedSet(new HashSet<>()))
|
|
||||||
.add(pipeline);
|
.add(pipeline);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -119,8 +109,12 @@ public synchronized void addPipeline(Pipeline pipeline) {
|
|||||||
public synchronized void removePipeline(Pipeline pipeline) {
|
public synchronized void removePipeline(Pipeline pipeline) {
|
||||||
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
|
for (DatanodeDetails details : pipeline.getDatanodes().values()) {
|
||||||
UUID dnId = details.getUuid();
|
UUID dnId = details.getUuid();
|
||||||
dn2PipelineMap.computeIfPresent(dnId,
|
dn2PipelineMap.computeIfPresent(
|
||||||
(k, v) -> {v.remove(pipeline); return v;});
|
dnId,
|
||||||
|
(k, v) -> {
|
||||||
|
v.remove(pipeline);
|
||||||
|
return v;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +111,7 @@ public abstract Pipeline allocatePipeline(
|
|||||||
ReplicationFactor replicationFactor);
|
ReplicationFactor replicationFactor);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the pipeline
|
* Initialize the pipeline.
|
||||||
* TODO: move the initialization to Ozone Client later
|
* TODO: move the initialization to Ozone Client later
|
||||||
*/
|
*/
|
||||||
public abstract void initializePipeline(Pipeline pipeline) throws IOException;
|
public abstract void initializePipeline(Pipeline pipeline) throws IOException;
|
||||||
@ -176,7 +176,7 @@ public Pipeline createPipeline(ReplicationFactor replicationFactor,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the pipeline from active allocation
|
* Remove the pipeline from active allocation.
|
||||||
* @param pipeline pipeline to be finalized
|
* @param pipeline pipeline to be finalized
|
||||||
*/
|
*/
|
||||||
public synchronized void finalizePipeline(Pipeline pipeline) {
|
public synchronized void finalizePipeline(Pipeline pipeline) {
|
||||||
@ -193,7 +193,7 @@ public void closePipeline(Pipeline pipeline) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* list members in the pipeline .
|
* list members in the pipeline.
|
||||||
* @return the datanode
|
* @return the datanode
|
||||||
*/
|
*/
|
||||||
public abstract List<DatanodeDetails> getMembers(PipelineID pipelineID)
|
public abstract List<DatanodeDetails> getMembers(PipelineID pipelineID)
|
||||||
|
@ -126,7 +126,7 @@ public PipelineSelector(NodeManager nodeManager,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Event and State Transition Mapping:
|
* Event and State Transition Mapping.
|
||||||
*
|
*
|
||||||
* State: ALLOCATED ---------------> CREATING
|
* State: ALLOCATED ---------------> CREATING
|
||||||
* Event: CREATE
|
* Event: CREATE
|
||||||
@ -293,7 +293,7 @@ public Pipeline getReplicationPipeline(ReplicationType replicationType,
|
|||||||
pipeline = manager.getPipeline(replicationFactor, replicationType);
|
pipeline = manager.getPipeline(replicationFactor, replicationType);
|
||||||
} else {
|
} else {
|
||||||
// if a new pipeline is created, initialize its state machine
|
// if a new pipeline is created, initialize its state machine
|
||||||
updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
|
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATE);
|
||||||
|
|
||||||
//TODO: move the initialization of pipeline to Ozone Client
|
//TODO: move the initialization of pipeline to Ozone Client
|
||||||
manager.initializePipeline(pipeline);
|
manager.initializePipeline(pipeline);
|
||||||
@ -334,7 +334,8 @@ public void finalizePipeline(Pipeline pipeline) throws IOException {
|
|||||||
/**
|
/**
|
||||||
* Close a given pipeline.
|
* Close a given pipeline.
|
||||||
*/
|
*/
|
||||||
public void closePipelineIfNoOpenContainers(Pipeline pipeline) throws IOException {
|
public void closePipelineIfNoOpenContainers(Pipeline pipeline)
|
||||||
|
throws IOException {
|
||||||
if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
|
if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -165,7 +165,8 @@ public ContainerInfo getContainer(long containerID) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ContainerWithPipeline getContainerWithPipeline(long containerID) throws IOException {
|
public ContainerWithPipeline getContainerWithPipeline(long containerID)
|
||||||
|
throws IOException {
|
||||||
String remoteUser = getRpcRemoteUsername();
|
String remoteUser = getRpcRemoteUsername();
|
||||||
getScm().checkAdminAccess(remoteUser);
|
getScm().checkAdminAccess(remoteUser);
|
||||||
return scm.getScmContainerManager()
|
return scm.getScmContainerManager()
|
||||||
|
@ -74,7 +74,7 @@ private static DatanodeDetails createDatanodeDetails(UUID uuid) {
|
|||||||
+ "." + random.nextInt(256)
|
+ "." + random.nextInt(256)
|
||||||
+ "." + random.nextInt(256)
|
+ "." + random.nextInt(256)
|
||||||
+ "." + random.nextInt(256);
|
+ "." + random.nextInt(256);
|
||||||
return createDatanodeDetails(uuid.toString(), "localhost", ipAddress);
|
return createDatanodeDetails(uuid.toString(), "localhost", ipAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -259,12 +259,12 @@ public static StorageReportProto createStorageReport(UUID nodeId, String path,
|
|||||||
StorageTypeProto storageTypeProto =
|
StorageTypeProto storageTypeProto =
|
||||||
type == null ? StorageTypeProto.DISK : type;
|
type == null ? StorageTypeProto.DISK : type;
|
||||||
srb.setStorageType(storageTypeProto);
|
srb.setStorageType(storageTypeProto);
|
||||||
return srb.build();
|
return srb.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generates random container reports
|
* Generates random container reports.
|
||||||
*
|
*
|
||||||
* @return ContainerReportsProto
|
* @return ContainerReportsProto
|
||||||
*/
|
*/
|
||||||
@ -281,7 +281,7 @@ public static ContainerReportsProto getRandomContainerReports() {
|
|||||||
*/
|
*/
|
||||||
public static ContainerReportsProto getRandomContainerReports(
|
public static ContainerReportsProto getRandomContainerReports(
|
||||||
int numberOfContainers) {
|
int numberOfContainers) {
|
||||||
List<ContainerInfo> containerInfos = new ArrayList<>();
|
List<ContainerInfo> containerInfos = new ArrayList<>();
|
||||||
for (int i = 0; i < numberOfContainers; i++) {
|
for (int i = 0; i < numberOfContainers; i++) {
|
||||||
containerInfos.add(getRandomContainerInfo(i));
|
containerInfos.add(getRandomContainerInfo(i));
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,6 @@
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.Collections;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.GB;
|
import static org.apache.hadoop.ozone.OzoneConsts.GB;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.MB;
|
import static org.apache.hadoop.ozone.OzoneConsts.MB;
|
||||||
|
@ -102,8 +102,8 @@ private void setupContainerManager() throws IOException {
|
|||||||
ContainerInfo containerInfo =
|
ContainerInfo containerInfo =
|
||||||
new ContainerInfo.Builder().setContainerID(1).build();
|
new ContainerInfo.Builder().setContainerID(1).build();
|
||||||
Pipeline pipeline =
|
Pipeline pipeline =
|
||||||
new Pipeline(null, LifeCycleState.CLOSED, ReplicationType.RATIS,
|
new Pipeline(null, LifeCycleState.CLOSED,
|
||||||
ReplicationFactor.THREE, null);
|
ReplicationType.RATIS, ReplicationFactor.THREE, null);
|
||||||
pipeline.addMember(dnList.get(0));
|
pipeline.addMember(dnList.get(0));
|
||||||
pipeline.addMember(dnList.get(1));
|
pipeline.addMember(dnList.get(1));
|
||||||
pipeline.addMember(dnList.get(2));
|
pipeline.addMember(dnList.get(2));
|
||||||
@ -379,7 +379,8 @@ public void testDeletedBlockTransactions() throws IOException {
|
|||||||
Assert.assertTrue(transactions.isFull());
|
Assert.assertTrue(transactions.isFull());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException {
|
private void mockContainerInfo(long containerID, DatanodeDetails dd)
|
||||||
|
throws IOException {
|
||||||
Pipeline pipeline =
|
Pipeline pipeline =
|
||||||
new Pipeline("fake", LifeCycleState.OPEN,
|
new Pipeline("fake", LifeCycleState.OPEN,
|
||||||
ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
|
ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
|
||||||
|
@ -48,12 +48,11 @@
|
|||||||
|
|
||||||
public class TestCommandStatusReportHandler implements EventPublisher {
|
public class TestCommandStatusReportHandler implements EventPublisher {
|
||||||
|
|
||||||
private static Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
.getLogger(TestCommandStatusReportHandler.class);
|
.getLogger(TestCommandStatusReportHandler.class);
|
||||||
private CommandStatusReportHandler cmdStatusReportHandler;
|
private CommandStatusReportHandler cmdStatusReportHandler;
|
||||||
private String storagePath = GenericTestUtils.getRandomizedTempPath()
|
private String storagePath = GenericTestUtils.getRandomizedTempPath()
|
||||||
.concat("/" + UUID.randomUUID().toString());
|
.concat("/" + UUID.randomUUID().toString());
|
||||||
;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
@ -69,10 +68,9 @@ public void testCommandStatusReport() {
|
|||||||
.emptyList());
|
.emptyList());
|
||||||
cmdStatusReportHandler.onMessage(report, this);
|
cmdStatusReportHandler.onMessage(report, this);
|
||||||
assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus"));
|
assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus"));
|
||||||
assertFalse(logCapturer.getOutput().contains
|
assertFalse(logCapturer.getOutput().contains(
|
||||||
("CloseContainerCommandStatus"));
|
"CloseContainerCommandStatus"));
|
||||||
assertFalse(logCapturer.getOutput().contains
|
assertFalse(logCapturer.getOutput().contains("ReplicateCommandStatus"));
|
||||||
("ReplicateCommandStatus"));
|
|
||||||
|
|
||||||
|
|
||||||
report = this.getStatusReport(this.getCommandStatusList());
|
report = this.getStatusReport(this.getCommandStatusList());
|
||||||
@ -93,13 +91,13 @@ public void testCommandStatusReport() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private CommandStatusReportFromDatanode getStatusReport(List<CommandStatus>
|
private CommandStatusReportFromDatanode getStatusReport(
|
||||||
reports) {
|
List<CommandStatus> reports) {
|
||||||
CommandStatusReportsProto report = TestUtils.createCommandStatusReport
|
CommandStatusReportsProto report = TestUtils.createCommandStatusReport(
|
||||||
(reports);
|
reports);
|
||||||
DatanodeDetails dn = TestUtils.randomDatanodeDetails();
|
DatanodeDetails dn = TestUtils.randomDatanodeDetails();
|
||||||
return new SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode
|
return new SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode(
|
||||||
(dn, report);
|
dn, report);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -37,7 +37,6 @@
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CREATE;
|
|
||||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CREATED;
|
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CREATED;
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
|
||||||
|
@ -178,8 +178,8 @@ public void testGetContainerWithPipeline() throws Exception {
|
|||||||
mapping
|
mapping
|
||||||
.updateContainerState(contInfo.getContainerID(), LifeCycleEvent.CLOSE);
|
.updateContainerState(contInfo.getContainerID(), LifeCycleEvent.CLOSE);
|
||||||
ContainerInfo finalContInfo = contInfo;
|
ContainerInfo finalContInfo = contInfo;
|
||||||
LambdaTestUtils.intercept(SCMException.class,"No entry exist for "
|
LambdaTestUtils.intercept(SCMException.class, "No entry exist for "
|
||||||
+ "containerId:" , () -> mapping.getContainerWithPipeline(
|
+ "containerId:", () -> mapping.getContainerWithPipeline(
|
||||||
finalContInfo.getContainerID()));
|
finalContInfo.getContainerID()));
|
||||||
|
|
||||||
mapping.getStateManager().getContainerStateMap()
|
mapping.getStateManager().getContainerStateMap()
|
||||||
@ -376,7 +376,8 @@ private ContainerInfo createContainer()
|
|||||||
@Test
|
@Test
|
||||||
public void testFlushAllContainers() throws IOException {
|
public void testFlushAllContainers() throws IOException {
|
||||||
ContainerInfo info = createContainer();
|
ContainerInfo info = createContainer();
|
||||||
List<ContainerInfo> containers = mapping.getStateManager().getAllContainers();
|
List<ContainerInfo> containers = mapping.getStateManager()
|
||||||
|
.getAllContainers();
|
||||||
Assert.assertTrue(containers.size() > 0);
|
Assert.assertTrue(containers.size() > 0);
|
||||||
mapping.flushContainerInfo();
|
mapping.flushContainerInfo();
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ public void test() throws IOException {
|
|||||||
.setContainerID((Long) invocation.getArguments()[0])
|
.setContainerID((Long) invocation.getArguments()[0])
|
||||||
.setState(LifeCycleState.CLOSED)
|
.setState(LifeCycleState.CLOSED)
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
ContainerStateManager containerStateManager =
|
ContainerStateManager containerStateManager =
|
||||||
new ContainerStateManager(conf, mapping);
|
new ContainerStateManager(conf, mapping);
|
||||||
|
@ -52,13 +52,13 @@ public void chooseDatanodes() throws SCMException {
|
|||||||
.thenReturn(new ArrayList<>(datanodes));
|
.thenReturn(new ArrayList<>(datanodes));
|
||||||
|
|
||||||
when(mockNodeManager.getNodeStat(anyObject()))
|
when(mockNodeManager.getNodeStat(anyObject()))
|
||||||
.thenReturn(new SCMNodeMetric(100l, 0L, 100L));
|
.thenReturn(new SCMNodeMetric(100L, 0L, 100L));
|
||||||
when(mockNodeManager.getNodeStat(datanodes.get(2)))
|
when(mockNodeManager.getNodeStat(datanodes.get(2)))
|
||||||
.thenReturn(new SCMNodeMetric(100l, 90L, 10L));
|
.thenReturn(new SCMNodeMetric(100L, 90L, 10L));
|
||||||
when(mockNodeManager.getNodeStat(datanodes.get(3)))
|
when(mockNodeManager.getNodeStat(datanodes.get(3)))
|
||||||
.thenReturn(new SCMNodeMetric(100l, 80L, 20L));
|
.thenReturn(new SCMNodeMetric(100L, 80L, 20L));
|
||||||
when(mockNodeManager.getNodeStat(datanodes.get(4)))
|
when(mockNodeManager.getNodeStat(datanodes.get(4)))
|
||||||
.thenReturn(new SCMNodeMetric(100l, 70L, 30L));
|
.thenReturn(new SCMNodeMetric(100L, 70L, 30L));
|
||||||
|
|
||||||
SCMContainerPlacementCapacity scmContainerPlacementRandom =
|
SCMContainerPlacementCapacity scmContainerPlacementRandom =
|
||||||
new SCMContainerPlacementCapacity(mockNodeManager, conf);
|
new SCMContainerPlacementCapacity(mockNodeManager, conf);
|
||||||
|
@ -51,9 +51,9 @@ public void chooseDatanodes() throws SCMException {
|
|||||||
.thenReturn(new ArrayList<>(datanodes));
|
.thenReturn(new ArrayList<>(datanodes));
|
||||||
|
|
||||||
when(mockNodeManager.getNodeStat(anyObject()))
|
when(mockNodeManager.getNodeStat(anyObject()))
|
||||||
.thenReturn(new SCMNodeMetric(100l, 0l, 100l));
|
.thenReturn(new SCMNodeMetric(100L, 0L, 100L));
|
||||||
when(mockNodeManager.getNodeStat(datanodes.get(2)))
|
when(mockNodeManager.getNodeStat(datanodes.get(2)))
|
||||||
.thenReturn(new SCMNodeMetric(100l, 90l, 10l));
|
.thenReturn(new SCMNodeMetric(100L, 90L, 10L));
|
||||||
|
|
||||||
SCMContainerPlacementRandom scmContainerPlacementRandom =
|
SCMContainerPlacementRandom scmContainerPlacementRandom =
|
||||||
new SCMContainerPlacementRandom(mockNodeManager, conf);
|
new SCMContainerPlacementRandom(mockNodeManager, conf);
|
||||||
|
@ -21,7 +21,6 @@
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||||
@ -132,7 +131,7 @@ protected List<DatanodeDetails> getCurrentReplicas(
|
|||||||
//WHEN
|
//WHEN
|
||||||
|
|
||||||
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
|
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
|
||||||
new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(),
|
new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
|
||||||
(short) 3));
|
(short) 3));
|
||||||
|
|
||||||
Thread.sleep(500L);
|
Thread.sleep(500L);
|
||||||
@ -159,10 +158,8 @@ public void testCommandWatcher() throws InterruptedException, IOException {
|
|||||||
leaseManager.start();
|
leaseManager.start();
|
||||||
|
|
||||||
ReplicationManager replicationManager =
|
ReplicationManager replicationManager =
|
||||||
new ReplicationManager(containerPlacementPolicy, containerStateManager,
|
new ReplicationManager(containerPlacementPolicy,
|
||||||
|
containerStateManager, queue, leaseManager) {
|
||||||
|
|
||||||
queue, leaseManager) {
|
|
||||||
@Override
|
@Override
|
||||||
protected List<DatanodeDetails> getCurrentReplicas(
|
protected List<DatanodeDetails> getCurrentReplicas(
|
||||||
ReplicationRequest request) throws IOException {
|
ReplicationRequest request) throws IOException {
|
||||||
@ -172,7 +169,7 @@ protected List<DatanodeDetails> getCurrentReplicas(
|
|||||||
replicationManager.start();
|
replicationManager.start();
|
||||||
|
|
||||||
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
|
queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
|
||||||
new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(),
|
new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
|
||||||
(short) 3));
|
(short) 3));
|
||||||
|
|
||||||
Thread.sleep(500L);
|
Thread.sleep(500L);
|
||||||
|
@ -92,8 +92,8 @@ public void testPollOp() throws InterruptedException {
|
|||||||
1, replicationQueue.size());
|
1, replicationQueue.size());
|
||||||
Assert.assertEquals(temp, msg5);
|
Assert.assertEquals(temp, msg5);
|
||||||
|
|
||||||
// Message 2 should be ordered before message 5 as both have same replication
|
// Message 2 should be ordered before message 5 as both have same
|
||||||
// number but message 2 has earlier timestamp.
|
// replication number but message 2 has earlier timestamp.
|
||||||
temp = replicationQueue.take();
|
temp = replicationQueue.take();
|
||||||
Assert.assertEquals("Should have 0 objects",
|
Assert.assertEquals("Should have 0 objects",
|
||||||
replicationQueue.size(), 0);
|
replicationQueue.size(), 0);
|
||||||
|
@ -32,8 +32,6 @@
|
|||||||
.SCMContainerPlacementCapacity;
|
.SCMContainerPlacementCapacity;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
|
||||||
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
|
||||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
@ -158,7 +156,8 @@ public void testContainerPlacementCapacity() throws IOException,
|
|||||||
|
|
||||||
assertTrue(nodeManager.isOutOfChillMode());
|
assertTrue(nodeManager.isOutOfChillMode());
|
||||||
|
|
||||||
ContainerWithPipeline containerWithPipeline = containerManager.allocateContainer(
|
ContainerWithPipeline containerWithPipeline = containerManager
|
||||||
|
.allocateContainer(
|
||||||
xceiverClientManager.getType(),
|
xceiverClientManager.getType(),
|
||||||
xceiverClientManager.getFactor(), "OZONE");
|
xceiverClientManager.getFactor(), "OZONE");
|
||||||
assertEquals(xceiverClientManager.getFactor().getNumber(),
|
assertEquals(xceiverClientManager.getFactor().getNumber(),
|
||||||
|
@ -424,7 +424,8 @@ public void testScmDetectStaleAndDeadNode() throws IOException,
|
|||||||
List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount);
|
List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount);
|
||||||
|
|
||||||
|
|
||||||
DatanodeDetails staleNode = TestUtils.createRandomDatanodeAndRegister(nodeManager);
|
DatanodeDetails staleNode = TestUtils.createRandomDatanodeAndRegister(
|
||||||
|
nodeManager);
|
||||||
|
|
||||||
// Heartbeat once
|
// Heartbeat once
|
||||||
nodeManager.processHeartbeat(staleNode);
|
nodeManager.processHeartbeat(staleNode);
|
||||||
|
@ -37,13 +37,12 @@
|
|||||||
|
|
||||||
public class TestNodeReportHandler implements EventPublisher {
|
public class TestNodeReportHandler implements EventPublisher {
|
||||||
|
|
||||||
private static Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
.getLogger(TestNodeReportHandler.class);
|
.getLogger(TestNodeReportHandler.class);
|
||||||
private NodeReportHandler nodeReportHandler;
|
private NodeReportHandler nodeReportHandler;
|
||||||
private SCMNodeManager nodeManager;
|
private SCMNodeManager nodeManager;
|
||||||
private String storagePath = GenericTestUtils.getRandomizedTempPath()
|
private String storagePath = GenericTestUtils.getRandomizedTempPath()
|
||||||
.concat("/" + UUID.randomUUID().toString());
|
.concat("/" + UUID.randomUUID().toString());
|
||||||
;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void resetEventCollector() throws IOException {
|
public void resetEventCollector() throws IOException {
|
||||||
|
@ -121,7 +121,7 @@ public static void setUp() throws Exception {
|
|||||||
config.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
|
config.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
|
||||||
config
|
config
|
||||||
.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
|
.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
|
||||||
config.set(HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,"1s");
|
config.set(HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL, "1s");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -228,8 +228,8 @@ public void testGetVersionToInvalidEndpoint() throws Exception {
|
|||||||
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
||||||
nonExistentServerAddress, 1000)) {
|
nonExistentServerAddress, 1000)) {
|
||||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
||||||
OzoneContainer ozoneContainer = new OzoneContainer(TestUtils.randomDatanodeDetails(),
|
OzoneContainer ozoneContainer = new OzoneContainer(
|
||||||
conf, null);
|
TestUtils.randomDatanodeDetails(), conf, null);
|
||||||
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
||||||
conf, ozoneContainer);
|
conf, ozoneContainer);
|
||||||
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
||||||
@ -405,7 +405,8 @@ public void testHeartbeatWithCommandStatusReport() throws Exception {
|
|||||||
assertEquals(0, scmServerImpl.getCommandStatusReportCount());
|
assertEquals(0, scmServerImpl.getCommandStatusReportCount());
|
||||||
|
|
||||||
// Send heartbeat again from heartbeat endpoint task
|
// Send heartbeat again from heartbeat endpoint task
|
||||||
final StateContext stateContext = heartbeatTaskHelper(serverAddress, 3000);
|
final StateContext stateContext = heartbeatTaskHelper(
|
||||||
|
serverAddress, 3000);
|
||||||
Map<Long, CommandStatus> map = stateContext.getCommandStatusMap();
|
Map<Long, CommandStatus> map = stateContext.getCommandStatusMap();
|
||||||
assertNotNull(map);
|
assertNotNull(map);
|
||||||
assertEquals("Should have 3 objects", 3, map.size());
|
assertEquals("Should have 3 objects", 3, map.size());
|
||||||
|
@ -87,11 +87,13 @@ public void testCapacityPlacementYieldsBetterDataDistribution() throws
|
|||||||
for (int x = 0; x < opsCount; x++) {
|
for (int x = 0; x < opsCount; x++) {
|
||||||
long containerSize = random.nextInt(100) * OzoneConsts.GB;
|
long containerSize = random.nextInt(100) * OzoneConsts.GB;
|
||||||
List<DatanodeDetails> nodesCapacity =
|
List<DatanodeDetails> nodesCapacity =
|
||||||
capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired, containerSize);
|
capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired,
|
||||||
|
containerSize);
|
||||||
assertEquals(nodesRequired, nodesCapacity.size());
|
assertEquals(nodesRequired, nodesCapacity.size());
|
||||||
|
|
||||||
List<DatanodeDetails> nodesRandom =
|
List<DatanodeDetails> nodesRandom =
|
||||||
randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired, containerSize);
|
randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired,
|
||||||
|
containerSize);
|
||||||
|
|
||||||
// One fifth of all calls are delete
|
// One fifth of all calls are delete
|
||||||
if (x % 5 == 0) {
|
if (x % 5 == 0) {
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.ozone.client;
|
package org.apache.hadoop.ozone.client;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
||||||
@ -158,9 +157,9 @@ private class VolumeIterator implements Iterator<OzoneVolume> {
|
|||||||
private OzoneVolume currentValue;
|
private OzoneVolume currentValue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an Iterator to iterate over all volumes after prevVolume of the user.
|
* Creates an Iterator to iterate over all volumes after
|
||||||
* If prevVolume is null it iterates from the first volume. The returned volumes
|
* prevVolume of the user. If prevVolume is null it iterates from the
|
||||||
* match volume prefix.
|
* first volume. The returned volumes match volume prefix.
|
||||||
* @param user user name
|
* @param user user name
|
||||||
* @param volPrefix volume prefix to match
|
* @param volPrefix volume prefix to match
|
||||||
*/
|
*/
|
||||||
|
@ -28,10 +28,7 @@
|
|||||||
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
|
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
|
||||||
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
|
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
|
||||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||||
import org.apache.hadoop.ozone.HddsDatanodeService;
|
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -39,7 +36,6 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
|
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
|
||||||
|
@ -23,7 +23,6 @@
|
|||||||
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.ozone.client.rpc.RpcClient;
|
import org.apache.hadoop.ozone.client.rpc.RpcClient;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
|
||||||
import org.apache.hadoop.ozone.client.rest.OzoneException;
|
import org.apache.hadoop.ozone.client.rest.OzoneException;
|
||||||
import org.apache.ratis.rpc.RpcType;
|
import org.apache.ratis.rpc.RpcType;
|
||||||
import org.apache.ratis.rpc.SupportedRpcType;
|
import org.apache.ratis.rpc.SupportedRpcType;
|
||||||
@ -54,9 +53,9 @@ class RatisTestSuite implements Closeable {
|
|||||||
* OZONE_ENABLED = true
|
* OZONE_ENABLED = true
|
||||||
* RATIS_ENABLED = true
|
* RATIS_ENABLED = true
|
||||||
*/
|
*/
|
||||||
public RatisTestSuite(final Class<?> clazz)
|
public RatisTestSuite()
|
||||||
throws IOException, TimeoutException, InterruptedException {
|
throws IOException, TimeoutException, InterruptedException {
|
||||||
conf = newOzoneConfiguration(clazz, RPC);
|
conf = newOzoneConfiguration(RPC);
|
||||||
cluster = newMiniOzoneCluster(NUM_DATANODES, conf);
|
cluster = newMiniOzoneCluster(NUM_DATANODES, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,8 +83,7 @@ public int getDatanodeOzoneRestPort() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static OzoneConfiguration newOzoneConfiguration(
|
static OzoneConfiguration newOzoneConfiguration(RpcType rpc) {
|
||||||
Class<?> clazz, RpcType rpc) {
|
|
||||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
final OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
initRatisConf(rpc, conf);
|
initRatisConf(rpc, conf);
|
||||||
return conf;
|
return conf;
|
||||||
|
@ -27,9 +27,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||||
|
@ -73,8 +73,7 @@ public class TestCloseContainerHandlingByClient {
|
|||||||
/**
|
/**
|
||||||
* Create a MiniDFSCluster for testing.
|
* Create a MiniDFSCluster for testing.
|
||||||
* <p>
|
* <p>
|
||||||
* Ozone is made active by setting OZONE_ENABLED = true and
|
* Ozone is made active by setting OZONE_ENABLED = true
|
||||||
* OZONE_HANDLER_TYPE_KEY = "distributed"
|
|
||||||
*
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@ -38,12 +38,10 @@
|
|||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -75,7 +75,6 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage.COMBINED;
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage.COMBINED;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
|
|
||||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
|
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
|
||||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
|
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
|
||||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
|
import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
|
||||||
|
@ -53,8 +53,7 @@ public class TestOzoneContainerRatis {
|
|||||||
public Timeout testTimeout = new Timeout(300000);
|
public Timeout testTimeout = new Timeout(300000);
|
||||||
|
|
||||||
static OzoneConfiguration newOzoneConfiguration() {
|
static OzoneConfiguration newOzoneConfiguration() {
|
||||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
return new OzoneConfiguration();
|
||||||
return conf;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void runTestOzoneContainerViaDataNodeRatis(
|
private static void runTestOzoneContainerViaDataNodeRatis(
|
||||||
|
@ -23,7 +23,6 @@
|
|||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.RatisTestHelper;
|
import org.apache.hadoop.ozone.RatisTestHelper;
|
||||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
|
||||||
import org.apache.ratis.rpc.RpcType;
|
import org.apache.ratis.rpc.RpcType;
|
||||||
import org.apache.ratis.rpc.SupportedRpcType;
|
import org.apache.ratis.rpc.SupportedRpcType;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
@ -46,8 +45,7 @@ public class TestRatisManager {
|
|||||||
TestRatisManager.class);
|
TestRatisManager.class);
|
||||||
|
|
||||||
static OzoneConfiguration newOzoneConfiguration() {
|
static OzoneConfiguration newOzoneConfiguration() {
|
||||||
final OzoneConfiguration conf = new OzoneConfiguration();
|
return new OzoneConfiguration();
|
||||||
return conf;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -30,8 +30,6 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test allocate container calls.
|
* Test allocate container calls.
|
||||||
*/
|
*/
|
||||||
|
@ -23,7 +23,6 @@
|
|||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.client.rest.headers.Header;
|
import org.apache.hadoop.ozone.client.rest.headers.Header;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
import org.apache.http.client.methods.HttpPost;
|
import org.apache.http.client.methods.HttpPost;
|
||||||
|
@ -30,7 +30,6 @@
|
|||||||
import org.apache.hadoop.ozone.client.rpc.RpcClient;
|
import org.apache.hadoop.ozone.client.rpc.RpcClient;
|
||||||
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -95,8 +94,6 @@ public static void init()
|
|||||||
InterruptedException {
|
InterruptedException {
|
||||||
conf = new OzoneConfiguration();
|
conf = new OzoneConfiguration();
|
||||||
|
|
||||||
String path = GenericTestUtils
|
|
||||||
.getTempPath(TestBuckets.class.getSimpleName());
|
|
||||||
cluster = MiniOzoneCluster.newBuilder(conf)
|
cluster = MiniOzoneCluster.newBuilder(conf)
|
||||||
.setNumDatanodes(3)
|
.setNumDatanodes(3)
|
||||||
.build();
|
.build();
|
||||||
@ -128,7 +125,7 @@ public void testCreateBucket() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void runTestCreateBucket(ClientProtocol client)
|
static void runTestCreateBucket(ClientProtocol client)
|
||||||
throws OzoneException, IOException, ParseException {
|
throws IOException {
|
||||||
String volumeName = OzoneUtils.getRequestID().toLowerCase();
|
String volumeName = OzoneUtils.getRequestID().toLowerCase();
|
||||||
VolumeArgs volumeArgs = VolumeArgs.newBuilder()
|
VolumeArgs volumeArgs = VolumeArgs.newBuilder()
|
||||||
.setOwner("bilbo")
|
.setOwner("bilbo")
|
||||||
|
@ -61,7 +61,7 @@ public static Collection<Object[]> clientProtocol() {
|
|||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void init() throws Exception {
|
public static void init() throws Exception {
|
||||||
suite = new RatisTestHelper.RatisTestSuite(TestBucketsRatis.class);
|
suite = new RatisTestHelper.RatisTestSuite();
|
||||||
conf = suite.getConf();
|
conf = suite.getConf();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ public class TestKeysRatis {
|
|||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void init() throws Exception {
|
public static void init() throws Exception {
|
||||||
suite = new RatisTestHelper.RatisTestSuite(TestBucketsRatis.class);
|
suite = new RatisTestHelper.RatisTestSuite();
|
||||||
path = GenericTestUtils.getTempPath(TestKeysRatis.class.getSimpleName());
|
path = GenericTestUtils.getTempPath(TestKeysRatis.class.getSimpleName());
|
||||||
ozoneCluster = suite.getCluster();
|
ozoneCluster = suite.getCluster();
|
||||||
ozoneCluster.waitForClusterToBeReady();
|
ozoneCluster.waitForClusterToBeReady();
|
||||||
|
@ -43,28 +43,15 @@
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone
|
import static org.apache.hadoop.ozone
|
||||||
.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
|
.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
|
||||||
import static org.apache.hadoop.ozone
|
import static org.apache.hadoop.ozone
|
||||||
.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
|
.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.ozone
|
|
||||||
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
|
||||||
import static org.apache.hadoop.ozone
|
|
||||||
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
|
|
||||||
import static org.apache.hadoop.ozone
|
|
||||||
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
|
|
||||||
import static org.apache.hadoop.ozone
|
|
||||||
.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
|
|
||||||
import static org.apache.hadoop.ozone
|
import static org.apache.hadoop.ozone
|
||||||
.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
|
.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
|
||||||
import static org.apache.hadoop.ozone
|
import static org.apache.hadoop.ozone
|
||||||
.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
|
.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
|
||||||
.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
|
|
||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
|
||||||
.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT;
|
|
||||||
import static org.apache.hadoop.ozone
|
import static org.apache.hadoop.ozone
|
||||||
.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
|
.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.ozone
|
import static org.apache.hadoop.ozone
|
||||||
|
@ -42,7 +42,6 @@
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||||
import org.apache.hadoop.ozone.client.OzoneBucket;
|
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||||
@ -60,7 +59,6 @@
|
|||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
|
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
import org.apache.commons.cli.BasicParser;
|
import org.apache.commons.cli.BasicParser;
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.Option;
|
import org.apache.commons.cli.Option;
|
||||||
@ -31,7 +30,6 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
||||||
@ -56,8 +54,6 @@
|
|||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_SUFFIX;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_SUFFIX;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
|
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
|
||||||
|
Loading…
Reference in New Issue
Block a user