HDFS-12756. Ozone: Add datanodeID to heartbeat responses and container protocol. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2017-11-10 10:26:26 -08:00 committed by Owen O'Malley
parent 446e84357d
commit 4460ac9c57
52 changed files with 783 additions and 690 deletions

View File

@ -50,7 +50,6 @@ public class XceiverClientHandler extends
private final Pipeline pipeline;
private volatile Channel channel;
private XceiverClientMetrics metrics;
/**
@ -58,6 +57,7 @@ public class XceiverClientHandler extends
*/
public XceiverClientHandler(Pipeline pipeline) {
super(false);
Preconditions.checkNotNull(pipeline);
this.pipeline = pipeline;
this.metrics = XceiverClientManager.getXceiverClientMetrics();
}
@ -139,6 +139,13 @@ public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
if (StringUtils.isEmpty(request.getTraceID())) {
throw new IllegalArgumentException("Invalid trace ID");
}
// Setting the datanode ID in the commands, so that we can distinguish
// commands when the cluster simulator is running.
if(!request.hasDatanodeID()) {
throw new IllegalArgumentException("Invalid Datanode ID");
}
metrics.incrPendingContainerOpsMetrics(request.getCmdType());
CompletableFuture<ContainerCommandResponseProto> future

View File

@ -82,10 +82,12 @@ public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
.newBuilder()
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
String id = xceiverClient.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetKey)
.setTraceID(traceID)
.setDatanodeID(id)
.setGetKey(readKeyRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
@ -107,10 +109,12 @@ public static void putKey(XceiverClientSpi xceiverClient,
.newBuilder()
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
String id = xceiverClient.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.PutKey)
.setTraceID(traceID)
.setDatanodeID(id)
.setPutKey(createKeyRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
@ -135,10 +139,12 @@ public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient,
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyName(key)
.setChunkData(chunk);
String id = xceiverClient.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.ReadChunk)
.setTraceID(traceID)
.setDatanodeID(id)
.setReadChunk(readChunkRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
@ -165,10 +171,12 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
.setKeyName(key)
.setChunkData(chunk)
.setData(data);
String id = xceiverClient.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.WriteChunk)
.setTraceID(traceID)
.setDatanodeID(id)
.setWriteChunk(writeChunkRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
@ -212,9 +220,14 @@ public static void writeSmallFile(XceiverClientSpi client,
.setKey(createKeyRequest).setData(ByteString.copyFrom(data))
.build();
String id = client.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto request =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutSmallFile)
.setTraceID(traceID).setPutSmallFile(putSmallFileRequest).build();
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.PutSmallFile)
.setTraceID(traceID)
.setDatanodeID(id)
.setPutSmallFile(putSmallFileRequest)
.build();
ContainerCommandResponseProto response = client.sendCommand(request);
validateContainerResponse(response);
}
@ -236,10 +249,12 @@ public static void createContainer(XceiverClientSpi client, String traceID)
createRequest.setPipeline(client.getPipeline().getProtobufMessage());
createRequest.setContainerData(containerData.build());
String id = client.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setCreateContainer(createRequest);
request.setDatanodeID(id);
request.setTraceID(traceID);
ContainerCommandResponseProto response = client.sendCommand(
request.build());
@ -261,12 +276,13 @@ public static void deleteContainer(XceiverClientSpi client,
deleteRequest.setName(client.getPipeline().getContainerName());
deleteRequest.setPipeline(client.getPipeline().getProtobufMessage());
deleteRequest.setForceDelete(force);
String id = client.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.DeleteContainer);
request.setDeleteContainer(deleteRequest);
request.setTraceID(traceID);
request.setDatanodeID(id);
ContainerCommandResponseProto response =
client.sendCommand(request.build());
validateContainerResponse(response);
@ -285,11 +301,13 @@ public static void closeContainer(XceiverClientSpi client, String traceID)
ContainerProtos.CloseContainerRequestProto.newBuilder();
closeRequest.setPipeline(client.getPipeline().getProtobufMessage());
String id = client.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(Type.CloseContainer);
request.setCloseContainer(closeRequest);
request.setTraceID(traceID);
request.setDatanodeID(id);
ContainerCommandResponseProto response =
client.sendCommand(request.build());
validateContainerResponse(response);
@ -309,11 +327,12 @@ public static ReadContainerResponseProto readContainer(
ReadContainerRequestProto.newBuilder();
readRequest.setName(containerName);
readRequest.setPipeline(client.getPipeline().getProtobufMessage());
String id = client.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(Type.ReadContainer);
request.setReadContainer(readRequest);
request.setDatanodeID(id);
request.setTraceID(traceID);
ContainerCommandResponseProto response =
client.sendCommand(request.build());
@ -346,10 +365,12 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
GetSmallFileRequestProto
.newBuilder().setKey(getKey)
.build();
String id = client.getPipeline().getLeader().getDatanodeUuid();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetSmallFile)
.setTraceID(traceID)
.setDatanodeID(id)
.setGetSmallFile(getSmallFileRequest)
.build();
ContainerCommandResponseProto response = client.sendCommand(request);

View File

@ -164,6 +164,7 @@ message ContainerCommandRequestProto {
optional PutSmallFileRequestProto putSmallFile = 16;
optional GetSmallFileRequestProto getSmallFile = 17;
optional CloseContainerRequestProto closeContainer = 18;
required string datanodeID = 19;
}
message ContainerCommandResponseProto {

View File

@ -103,7 +103,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
.sendHeartbeat(datanodeID, this.context.getNodeReport(),
this.context.getContainerReportState());
processResponse(reponse);
processResponse(reponse, datanodeID);
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount();
} catch (IOException ex) {
@ -127,9 +127,14 @@ public static Builder newBuilder() {
*
* @param response - SCMHeartbeat response.
*/
private void processResponse(SCMHeartbeatResponseProto response) {
private void processResponse(SCMHeartbeatResponseProto response,
final DatanodeID datanodeID) {
for (SCMCommandResponseProto commandResponseProto : response
.getCommandsList()) {
// Verify the response is indeed for this datanode.
Preconditions.checkState(commandResponseProto.getDatanodeUUID()
.equalsIgnoreCase(datanodeID.getDatanodeUuid().toString()),
"Unexpected datanode ID in the response.");
switch (commandResponseProto.getCmdType()) {
case sendContainerReport:
this.context.addCommand(SendContainerCommand.getFromProtobuf(

View File

@ -226,8 +226,9 @@ private StorageContainerManager(OzoneConfiguration conf)
StorageContainerManager.initMetrics();
scmStorage = new SCMStorage(conf);
if (scmStorage.getState() != StorageState.INITIALIZED) {
throw new SCMException("SCM not initialized.",
String clusterId = scmStorage.getClusterID();
if (clusterId == null) {
throw new SCMException("clusterId not found",
ResultCodes.SCM_NOT_INITIALIZED);
}
scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID());
@ -492,11 +493,13 @@ private static StartupOption parseArguments(String[] args) {
* @throws InvalidProtocolBufferException
*/
@VisibleForTesting
public SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
public SCMCommandResponseProto getCommandResponse(SCMCommand cmd,
final String datanodID)
throws IOException {
Type type = cmd.getType();
SCMCommandResponseProto.Builder builder =
SCMCommandResponseProto.newBuilder();
SCMCommandResponseProto.newBuilder()
.setDatanodeUUID(datanodID);
switch (type) {
case registeredCommand:
return builder.setCmdType(Type.registeredCommand)
@ -881,7 +884,8 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
getScmNodeManager().sendHeartbeat(datanodeID, nodeReport, reportState);
List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
cmdResponses.add(getCommandResponse(cmd));
cmdResponses.add(getCommandResponse(cmd, datanodeID.getDatanodeUuid()
.toString()));
}
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
.build();

View File

@ -231,6 +231,7 @@ message SCMCommandResponseProto {
optional SendContainerReportProto sendReport = 5;
optional SCMReregisterCmdResponseProto reregisterProto = 6;
optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7;
required string datanodeUUID = 8;
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.scm.XceiverClientManager;
@ -71,7 +72,7 @@ public static void init() throws IOException {
config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
cluster = new MiniOzoneCluster.Builder(config)
cluster = new MiniOzoneClassicCluster.Builder(config)
.numDataNodes(1).setHandlerType("distributed").build();
storageContainerLocationClient = cluster
.createStorageContainerLocationClient();

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -77,7 +78,7 @@ public static void init() throws IOException {
config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
cluster = new MiniOzoneCluster.Builder(config)
cluster = new MiniOzoneClassicCluster.Builder(config)
.numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock;
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -83,7 +84,7 @@ public static void init() throws IOException {
config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
cluster = new MiniOzoneCluster.Builder(config)
cluster = new MiniOzoneClassicCluster.Builder(config)
.numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster

View File

@ -0,0 +1,556 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;
import java.io.File;
import java.util.Optional;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.container.common
.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.ozone.scm.SCMStorage;
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
.HEALTHY;
import static org.junit.Assert.assertFalse;
/**
* MiniOzoneCluster creates a complete in-process Ozone cluster suitable for
* running tests. The cluster consists of a StorageContainerManager, Namenode
* and multiple DataNodes. This class subclasses {@link MiniDFSCluster} for
* convenient reuse of logic for starting DataNodes.
*/
@InterfaceAudience.Private
public final class MiniOzoneClassicCluster extends MiniDFSCluster
implements MiniOzoneCluster {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneClassicCluster.class);
private static final String USER_AUTH = "hdfs";
private final OzoneConfiguration conf;
private final StorageContainerManager scm;
private final KeySpaceManager ksm;
private final Path tempPath;
/**
* Creates a new MiniOzoneCluster.
*
* @param builder cluster builder
* @param scm StorageContainerManager, already running
* @throws IOException if there is an I/O error
*/
private MiniOzoneClassicCluster(Builder builder, StorageContainerManager scm,
KeySpaceManager ksm)
throws IOException {
super(builder);
this.conf = builder.conf;
this.scm = scm;
this.ksm = ksm;
tempPath = Paths.get(builder.getPath(), builder.getRunID());
}
@Override
protected void setupDatanodeAddress(
int i, Configuration dnConf, boolean setupHostsFile,
boolean checkDnAddrConf) throws IOException {
super.setupDatanodeAddress(i, dnConf, setupHostsFile, checkDnAddrConf);
setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
getInstanceStorageDir(i, -1).getCanonicalPath());
String containerMetaDirs = dnConf.get(
OzoneConfigKeys.OZONE_METADATA_DIRS) + "-dn-" + i;
Path containerMetaDirPath = Paths.get(containerMetaDirs);
setConf(i, dnConf, OzoneConfigKeys.OZONE_METADATA_DIRS,
containerMetaDirs);
Path containerRootPath =
containerMetaDirPath.resolve(OzoneConsts.CONTAINER_ROOT_PREFIX);
Files.createDirectories(containerRootPath);
}
static void setConf(int i, Configuration conf, String key, String value) {
conf.set(key, value);
LOG.info("dn{}: set {} = {}", i, key, value);
}
@Override
public void close() {
shutdown();
try {
FileUtils.deleteDirectory(tempPath.toFile());
} catch (IOException e) {
String errorMessage = "Cleaning up metadata directories failed." + e;
assertFalse(errorMessage, true);
}
try {
final String localStorage =
conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
FileUtils.deleteDirectory(new File(localStorage));
} catch (IOException e) {
LOG.error("Cleaning up local storage failed", e);
}
}
@Override
public boolean restartDataNode(int i) throws IOException {
return restartDataNode(i, true);
}
/*
* Restart a particular datanode, wait for it to become active
*/
@Override
public boolean restartDataNode(int i, boolean keepPort) throws IOException {
LOG.info("restarting datanode:{} keepPort:{}", i, keepPort);
if (keepPort) {
DataNodeProperties dnProp = dataNodes.get(i);
OzoneContainer container =
dnProp.getDatanode().getOzoneContainerManager();
Configuration config = dnProp.getConf();
int currentPort = container.getContainerServerPort();
config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
int ratisPort = container.getRatisContainerServerPort();
config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
}
boolean status = super.restartDataNode(i, keepPort);
try {
this.waitActive();
waitDatanodeOzoneReady(i);
} catch (TimeoutException | InterruptedException e) {
Thread.interrupted();
}
return status;
}
@Override
public void shutdown() {
super.shutdown();
LOG.info("Shutting down the Mini Ozone Cluster");
if (ksm != null) {
LOG.info("Shutting down the keySpaceManager");
ksm.stop();
ksm.join();
}
if (scm != null) {
LOG.info("Shutting down the StorageContainerManager");
scm.stop();
scm.join();
}
}
@Override
public StorageContainerManager getStorageContainerManager() {
return this.scm;
}
@Override
public KeySpaceManager getKeySpaceManager() {
return this.ksm;
}
/**
* Creates an {@link OzoneRestClient} connected to this cluster's REST
* service. Callers take ownership of the client and must close it when done.
*
* @return OzoneRestClient connected to this cluster's REST service
* @throws OzoneException if Ozone encounters an error creating the client
*/
@Override
public OzoneRestClient createOzoneRestClient() throws OzoneException {
Preconditions.checkState(!getDataNodes().isEmpty(),
"Cannot create OzoneRestClient if the cluster has no DataNodes.");
// An Ozone request may originate at any DataNode, so pick one at random.
int dnIndex = new Random().nextInt(getDataNodes().size());
String uri = String.format("http://127.0.0.1:%d",
getDataNodes().get(dnIndex).getInfoPort());
LOG.info("Creating Ozone client to DataNode {} with URI {} and user {}",
dnIndex, uri, USER_AUTH);
try {
return new OzoneRestClient(uri, USER_AUTH);
} catch (URISyntaxException e) {
// We control the REST service URI, so it should never be invalid.
throw new IllegalStateException("Unexpected URISyntaxException", e);
}
}
/**
* Creates an RPC proxy connected to this cluster's StorageContainerManager
* for accessing container location information. Callers take ownership of
* the proxy and must close it when done.
*
* @return RPC proxy for accessing container location information
* @throws IOException if there is an I/O error
*/
@Override
public StorageContainerLocationProtocolClientSideTranslatorPB
createStorageContainerLocationClient() throws IOException {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
InetSocketAddress address = scm.getClientRpcAddress();
LOG.info(
"Creating StorageContainerLocationProtocol RPC client with address {}",
address);
return new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
}
/**
* Waits for the Ozone cluster to be ready for processing requests.
*/
@Override
public void waitOzoneReady() throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
final int healthy = scm.getNodeCount(HEALTHY);
final boolean isReady = healthy >= numDataNodes;
LOG.info("{}. Got {} of {} DN Heartbeats.",
isReady? "Cluster is ready" : "Waiting for cluster to be ready",
healthy, numDataNodes);
return isReady;
}, 1000, 60 * 1000); //wait for 1 min.
}
/**
* Waits for a particular Datanode to be ready for processing ozone requests.
*/
@Override
public void waitDatanodeOzoneReady(int dnIndex)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
DatanodeStateMachine.DatanodeStates state =
dataNodes.get(dnIndex).getDatanode().getOzoneStateMachineState();
final boolean rebootComplete =
(state == DatanodeStateMachine.DatanodeStates.RUNNING);
LOG.info("{} Current state:{}", rebootComplete, state);
return rebootComplete;
}, 1000, 60 * 1000); //wait for 1 min.
}
/**
* Waits for SCM to be out of Chill Mode. Many tests can be run iff we are out
* of Chill mode.
*
* @throws TimeoutException
* @throws InterruptedException
*/
@Override
public void waitTobeOutOfChillMode() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(() -> {
if (scm.getScmNodeManager().isOutOfChillMode()) {
return true;
}
LOG.info("Waiting for cluster to be ready. No datanodes found");
return false;
}, 100, 45000);
}
@Override
public void waitForHeartbeatProcessed() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(() ->
scm.getScmNodeManager().waitForHeartbeatProcessed(), 100,
4 * 1000);
GenericTestUtils.waitFor(() ->
scm.getScmNodeManager().getStats().getCapacity().get() > 0, 100,
4 * 1000);
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
public static class Builder
extends MiniDFSCluster.Builder {
private final OzoneConfiguration conf;
private static final int DEFAULT_HB_SECONDS = 1;
private static final int DEFAULT_PROCESSOR_MS = 100;
private final String path;
private final UUID runID;
private Optional<String> ozoneHandlerType = java.util.Optional.empty();
private Optional<Boolean> enableTrace = Optional.of(false);
private Optional<Integer> hbSeconds = Optional.empty();
private Optional<Integer> hbProcessorInterval = Optional.empty();
private Optional<String> scmMetadataDir = Optional.empty();
private Boolean ozoneEnabled = true;
private Boolean waitForChillModeFinish = true;
private Boolean randomContainerPort = true;
// Use relative smaller number of handlers for testing
private int numOfKsmHandlers = 20;
private int numOfScmHandlers = 20;
/**
* Creates a new Builder.
*
* @param conf configuration
*/
public Builder(OzoneConfiguration conf) {
super(conf);
// Mini Ozone cluster will not come up if the port is not true, since
// Ratis will exit if the server port cannot be bound. We can remove this
// hard coding once we fix the Ratis default behaviour.
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true);
this.conf = conf;
path = GenericTestUtils.getTempPath(
MiniOzoneClassicCluster.class.getSimpleName() +
UUID.randomUUID().toString());
runID = UUID.randomUUID();
}
public Builder setRandomContainerPort(boolean randomPort) {
this.randomContainerPort = randomPort;
return this;
}
@Override
public Builder numDataNodes(int val) {
super.numDataNodes(val);
return this;
}
@Override
public Builder storageCapacities(long[] capacities) {
super.storageCapacities(capacities);
return this;
}
public Builder setHandlerType(String handler) {
ozoneHandlerType = Optional.of(handler);
return this;
}
public Builder setTrace(Boolean trace) {
enableTrace = Optional.of(trace);
return this;
}
public Builder setSCMHBInterval(int seconds) {
hbSeconds = Optional.of(seconds);
return this;
}
public Builder setSCMHeartbeatProcessingInterval(int milliseconds) {
hbProcessorInterval = Optional.of(milliseconds);
return this;
}
public Builder setSCMMetadataDir(String scmMetadataDirPath) {
scmMetadataDir = Optional.of(scmMetadataDirPath);
return this;
}
public Builder disableOzone() {
ozoneEnabled = false;
return this;
}
public Builder doNotwaitTobeOutofChillMode() {
waitForChillModeFinish = false;
return this;
}
public Builder setNumOfKSMHandlers(int numOfHandlers) {
numOfKsmHandlers = numOfHandlers;
return this;
}
public Builder setNumOfSCMHandlers(int numOfHandlers) {
numOfScmHandlers = numOfHandlers;
return this;
}
public String getPath() {
return path;
}
public String getRunID() {
return runID.toString();
}
@Override
public MiniOzoneClassicCluster build() throws IOException {
configureHandler();
configureTrace();
configureSCMheartbeat();
configScmMetadata();
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "127.0.0.1:0");
conf.set(KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
// Configure KSM and SCM handlers
conf.setInt(ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY, numOfScmHandlers);
conf.setInt(KSMConfigKeys.OZONE_KSM_HANDLER_COUNT_KEY, numOfKsmHandlers);
// Use random ports for ozone containers in mini cluster,
// in order to launch multiple container servers per node.
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
randomContainerPort);
SCMStorage scmStorage = new SCMStorage(conf);
scmStorage.initialize();
StorageContainerManager scm = StorageContainerManager.createSCM(
null, conf);
scm.start();
KeySpaceManager ksm = new KeySpaceManager(conf);
ksm.start();
String addressString = scm.getDatanodeRpcAddress().getHostString() +
":" + scm.getDatanodeRpcAddress().getPort();
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, addressString);
MiniOzoneClassicCluster cluster =
new MiniOzoneClassicCluster(this, scm, ksm);
try {
cluster.waitOzoneReady();
if (waitForChillModeFinish) {
cluster.waitTobeOutOfChillMode();
}
cluster.waitForHeartbeatProcessed();
} catch (Exception e) {
// A workaround to propagate MiniOzoneCluster failures without
// changing the method signature (which would require cascading
// changes to hundreds of unrelated HDFS tests).
throw new IOException("Failed to start MiniOzoneCluster", e);
}
return cluster;
}
private void configScmMetadata() throws IOException {
if (scmMetadataDir.isPresent()) {
// if user specifies a path in the test, it is assumed that user takes
// care of creating and cleaning up that directory after the tests.
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
scmMetadataDir.get());
return;
}
// If user has not specified a path, create a UUID for this miniCluster
// and create SCM under that directory.
Path scmPath = Paths.get(path, runID.toString(), "cont-meta");
Files.createDirectories(scmPath);
Path containerPath = scmPath.resolve(OzoneConsts.CONTAINER_ROOT_PREFIX);
Files.createDirectories(containerPath);
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath
.toString());
// TODO : Fix this, we need a more generic mechanism to map
// different datanode ID for different datanodes when we have lots of
// datanodes in the cluster.
conf.setStrings(ScmConfigKeys.OZONE_SCM_DATANODE_ID,
scmPath.toString() + "/datanode.id");
}
private void configureHandler() {
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, this.ozoneEnabled);
if (!ozoneHandlerType.isPresent()) {
throw new IllegalArgumentException(
"The Ozone handler type must be specified.");
} else {
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
ozoneHandlerType.get());
}
}
private void configureTrace() {
if (enableTrace.isPresent()) {
conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY,
enableTrace.get());
GenericTestUtils.setRootLogLevel(Level.TRACE);
}
GenericTestUtils.setRootLogLevel(Level.INFO);
}
private void configureSCMheartbeat() {
if (hbSeconds.isPresent()) {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
hbSeconds.get());
} else {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
DEFAULT_HB_SECONDS);
}
if (hbProcessorInterval.isPresent()) {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
hbProcessorInterval.get());
} else {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
DEFAULT_PROCESSOR_MS);
}
}
}
}

View File

@ -17,564 +17,46 @@
*/
package org.apache.hadoop.ozone;
import java.io.File;
import java.util.Optional;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common
.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.ozone.ksm.protocolPB
.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.scm.SCMStorage;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState
.HEALTHY;
import static org.junit.Assert.assertFalse;
/**
* MiniOzoneCluster creates a complete in-process Ozone cluster suitable for
* running tests. The cluster consists of a StorageContainerManager, Namenode
* and multiple DataNodes. This class subclasses {@link MiniDFSCluster} for
* convenient reuse of logic for starting DataNodes.
* Interface used for MiniOzoneClusters.
*/
@InterfaceAudience.Private
public final class MiniOzoneCluster extends MiniDFSCluster
implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneCluster.class);
private static final String USER_AUTH = "hdfs";
public interface MiniOzoneCluster extends AutoCloseable, Closeable {
void close();
private final OzoneConfiguration conf;
private final StorageContainerManager scm;
private final KeySpaceManager ksm;
private final Path tempPath;
boolean restartDataNode(int i) throws IOException;
/**
* Creates a new MiniOzoneCluster.
*
* @param builder cluster builder
* @param scm StorageContainerManager, already running
* @throws IOException if there is an I/O error
*/
private MiniOzoneCluster(Builder builder, StorageContainerManager scm,
KeySpaceManager ksm)
throws IOException {
super(builder);
this.conf = builder.conf;
this.scm = scm;
this.ksm = ksm;
tempPath = Paths.get(builder.getPath(), builder.getRunID());
}
boolean restartDataNode(int i, boolean keepPort) throws IOException;
void shutdown();
@Override
protected void setupDatanodeAddress(
int i, Configuration dnConf, boolean setupHostsFile,
boolean checkDnAddrConf) throws IOException {
super.setupDatanodeAddress(i, dnConf, setupHostsFile, checkDnAddrConf);
setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
getInstanceStorageDir(i, -1).getCanonicalPath());
String containerMetaDirs = dnConf.get(
OzoneConfigKeys.OZONE_METADATA_DIRS) + "-dn-" + i;
Path containerMetaDirPath = Paths.get(containerMetaDirs);
setConf(i, dnConf, OzoneConfigKeys.OZONE_METADATA_DIRS,
containerMetaDirs);
Path containerRootPath =
containerMetaDirPath.resolve(OzoneConsts.CONTAINER_ROOT_PREFIX);
Files.createDirectories(containerRootPath);
}
StorageContainerManager getStorageContainerManager();
static void setConf(int i, Configuration conf, String key, String value) {
conf.set(key, value);
LOG.info("dn{}: set {} = {}", i, key, value);
}
KeySpaceManager getKeySpaceManager();
@Override
public void close() {
shutdown();
try {
FileUtils.deleteDirectory(tempPath.toFile());
} catch (IOException e) {
String errorMessage = "Cleaning up metadata directories failed." + e;
assertFalse(errorMessage, true);
}
OzoneRestClient createOzoneRestClient() throws OzoneException;
try {
final String localStorage =
conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
FileUtils.deleteDirectory(new File(localStorage));
} catch (IOException e) {
LOG.error("Cleaning up local storage failed", e);
}
}
StorageContainerLocationProtocolClientSideTranslatorPB
createStorageContainerLocationClient() throws IOException;
public boolean restartDataNode(int i) throws IOException {
return restartDataNode(i, true);
}
/*
* Restart a particular datanode, wait for it to become active
*/
public boolean restartDataNode(int i, boolean keepPort) throws IOException {
LOG.info("restarting datanode:{} keepPort:{}", i, keepPort);
if (keepPort) {
DataNodeProperties dnProp = dataNodes.get(i);
OzoneContainer container =
dnProp.getDatanode().getOzoneContainerManager();
Configuration config = dnProp.getConf();
int currentPort = container.getContainerServerPort();
config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
int ratisPort = container.getRatisContainerServerPort();
config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
}
boolean status = super.restartDataNode(i, keepPort);
void waitOzoneReady() throws TimeoutException, InterruptedException;
try {
this.waitActive();
waitDatanodeOzoneReady(i);
} catch (TimeoutException | InterruptedException e) {
Thread.interrupted();
}
return status;
}
void waitDatanodeOzoneReady(int dnIndex)
throws TimeoutException, InterruptedException;
@Override
public void shutdown() {
super.shutdown();
LOG.info("Shutting down the Mini Ozone Cluster");
void waitTobeOutOfChillMode() throws TimeoutException,
InterruptedException;
if (ksm != null) {
LOG.info("Shutting down the keySpaceManager");
ksm.stop();
ksm.join();
}
if (scm != null) {
LOG.info("Shutting down the StorageContainerManager");
scm.stop();
scm.join();
}
}
public StorageContainerManager getStorageContainerManager() {
return this.scm;
}
public KeySpaceManager getKeySpaceManager() {
return this.ksm;
}
/**
* Creates an {@link OzoneRestClient} connected to this cluster's REST
* service. Callers take ownership of the client and must close it when done.
*
* @return OzoneRestClient connected to this cluster's REST service
* @throws OzoneException if Ozone encounters an error creating the client
*/
public OzoneRestClient createOzoneRestClient() throws OzoneException {
Preconditions.checkState(!getDataNodes().isEmpty(),
"Cannot create OzoneRestClient if the cluster has no DataNodes.");
// An Ozone request may originate at any DataNode, so pick one at random.
int dnIndex = new Random().nextInt(getDataNodes().size());
String uri = String.format("http://127.0.0.1:%d",
getDataNodes().get(dnIndex).getInfoPort());
LOG.info("Creating Ozone client to DataNode {} with URI {} and user {}",
dnIndex, uri, USER_AUTH);
try {
return new OzoneRestClient(uri, USER_AUTH);
} catch (URISyntaxException e) {
// We control the REST service URI, so it should never be invalid.
throw new IllegalStateException("Unexpected URISyntaxException", e);
}
}
/**
* Creates an RPC proxy connected to this cluster's StorageContainerManager
* for accessing container location information. Callers take ownership of
* the proxy and must close it when done.
*
* @return RPC proxy for accessing container location information
* @throws IOException if there is an I/O error
*/
public StorageContainerLocationProtocolClientSideTranslatorPB
createStorageContainerLocationClient() throws IOException {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
InetSocketAddress address = scm.getClientRpcAddress();
LOG.info(
"Creating StorageContainerLocationProtocol RPC client with address {}",
address);
return new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
}
/**
* Creates an RPC proxy connected to this cluster's KeySpaceManager
* for accessing Key Space Manager information. Callers take ownership of
* the proxy and must close it when done.
*
* @return RPC proxy for accessing Key Space Manager information
* @throws IOException if there is an I/O error
*/
public KeySpaceManagerProtocolClientSideTranslatorPB
createKeySpaceManagerClient() throws IOException {
long ksmVersion = RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
InetSocketAddress ksmAddress = OzoneClientUtils
.getKsmAddressForClients(conf);
LOG.info("Creating KeySpaceManager RPC client with address {}",
ksmAddress);
RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
ProtobufRpcEngine.class);
return new KeySpaceManagerProtocolClientSideTranslatorPB(
RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
ksmAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
}
/**
* Waits for the Ozone cluster to be ready for processing requests.
*/
public void waitOzoneReady() throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
final int healthy = scm.getNodeCount(HEALTHY);
final boolean isReady = healthy >= numDataNodes;
LOG.info("{}. Got {} of {} DN Heartbeats.",
isReady? "Cluster is ready" : "Waiting for cluster to be ready",
healthy, numDataNodes);
return isReady;
}, 1000, 60 * 1000); //wait for 1 min.
}
/**
* Waits for a particular Datanode to be ready for processing ozone requests.
*/
public void waitDatanodeOzoneReady(int dnIndex)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
DatanodeStateMachine.DatanodeStates state =
dataNodes.get(dnIndex).getDatanode().getOzoneStateMachineState();
final boolean rebootComplete =
(state == DatanodeStateMachine.DatanodeStates.RUNNING);
LOG.info("{} Current state:{}", rebootComplete, state);
return rebootComplete;
}, 1000, 60 * 1000); //wait for 1 min.
}
/**
* Waits for SCM to be out of Chill Mode. Many tests can be run iff we are out
* of Chill mode.
*
* @throws TimeoutException
* @throws InterruptedException
*/
public void waitTobeOutOfChillMode() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(() -> {
if (scm.getScmNodeManager().isOutOfChillMode()) {
return true;
}
LOG.info("Waiting for cluster to be ready. No datanodes found");
return false;
}, 100, 45000);
}
public void waitForHeartbeatProcessed() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(() ->
scm.getScmNodeManager().waitForHeartbeatProcessed(), 100,
4 * 1000);
GenericTestUtils.waitFor(() ->
scm.getScmNodeManager().getStats().getCapacity().get() > 0, 100,
4 * 1000);
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
public static class Builder
extends MiniDFSCluster.Builder {
private final OzoneConfiguration conf;
private static final int DEFAULT_HB_SECONDS = 1;
private static final int DEFAULT_PROCESSOR_MS = 100;
private final String path;
private final UUID runID;
private Optional<String> ozoneHandlerType = java.util.Optional.empty();
private Optional<Boolean> enableTrace = Optional.of(false);
private Optional<Integer> hbSeconds = Optional.empty();
private Optional<Integer> hbProcessorInterval = Optional.empty();
private Optional<String> scmMetadataDir = Optional.empty();
private Boolean ozoneEnabled = true;
private Boolean waitForChillModeFinish = true;
private Boolean randomContainerPort = true;
// Use relative smaller number of handlers for testing
private int numOfKsmHandlers = 20;
private int numOfScmHandlers = 20;
/**
* Creates a new Builder.
*
* @param conf configuration
*/
public Builder(OzoneConfiguration conf) {
super(conf);
// Mini Ozone cluster will not come up if the port is not true, since
// Ratis will exit if the server port cannot be bound. We can remove this
// hard coding once we fix the Ratis default behaviour.
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true);
this.conf = conf;
path = GenericTestUtils.getTempPath(
MiniOzoneCluster.class.getSimpleName() +
UUID.randomUUID().toString());
runID = UUID.randomUUID();
}
public Builder setRandomContainerPort(boolean randomPort) {
this.randomContainerPort = randomPort;
return this;
}
@Override
public Builder numDataNodes(int val) {
super.numDataNodes(val);
return this;
}
@Override
public Builder storageCapacities(long[] capacities) {
super.storageCapacities(capacities);
return this;
}
public Builder setHandlerType(String handler) {
ozoneHandlerType = Optional.of(handler);
return this;
}
public Builder setTrace(Boolean trace) {
enableTrace = Optional.of(trace);
return this;
}
public Builder setSCMHBInterval(int seconds) {
hbSeconds = Optional.of(seconds);
return this;
}
public Builder setSCMHeartbeatProcessingInterval(int milliseconds) {
hbProcessorInterval = Optional.of(milliseconds);
return this;
}
public Builder setSCMMetadataDir(String scmMetadataDirPath) {
scmMetadataDir = Optional.of(scmMetadataDirPath);
return this;
}
public Builder disableOzone() {
ozoneEnabled = false;
return this;
}
public Builder doNotwaitTobeOutofChillMode() {
waitForChillModeFinish = false;
return this;
}
public Builder setNumOfKSMHandlers(int numOfHandlers) {
numOfKsmHandlers = numOfHandlers;
return this;
}
public Builder setNumOfSCMHandlers(int numOfHandlers) {
numOfScmHandlers = numOfHandlers;
return this;
}
public String getPath() {
return path;
}
public String getRunID() {
return runID.toString();
}
@Override
public MiniOzoneCluster build() throws IOException {
configureHandler();
configureTrace();
configureSCMheartbeat();
configScmMetadata();
configVersionFile();
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "127.0.0.1:0");
conf.set(KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY, "127.0.0.1:0");
// Configure KSM and SCM handlers
conf.setInt(ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY, numOfScmHandlers);
conf.setInt(KSMConfigKeys.OZONE_KSM_HANDLER_COUNT_KEY, numOfKsmHandlers);
// Use random ports for ozone containers in mini cluster,
// in order to launch multiple container servers per node.
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
randomContainerPort);
StorageContainerManager scm =
StorageContainerManager.createSCM(null, conf);
scm.start();
KeySpaceManager ksm = new KeySpaceManager(conf);
ksm.start();
String addressString = scm.getDatanodeRpcAddress().getHostString() +
":" + scm.getDatanodeRpcAddress().getPort();
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, addressString);
MiniOzoneCluster cluster = new MiniOzoneCluster(this, scm, ksm);
try {
cluster.waitOzoneReady();
if (waitForChillModeFinish) {
cluster.waitTobeOutOfChillMode();
}
cluster.waitForHeartbeatProcessed();
} catch (Exception e) {
// A workaround to propagate MiniOzoneCluster failures without
// changing the method signature (which would require cascading
// changes to hundreds of unrelated HDFS tests).
throw new IOException("Failed to start MiniOzoneCluster", e);
}
return cluster;
}
private void configScmMetadata() throws IOException {
if (scmMetadataDir.isPresent()) {
// if user specifies a path in the test, it is assumed that user takes
// care of creating and cleaning up that directory after the tests.
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
scmMetadataDir.get());
return;
}
// If user has not specified a path, create a UUID for this miniCluster
// and create SCM under that directory.
Path scmPath = Paths.get(path, runID.toString(), "cont-meta");
Files.createDirectories(scmPath);
Path containerPath = scmPath.resolve(OzoneConsts.CONTAINER_ROOT_PREFIX);
Files.createDirectories(containerPath);
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath
.toString());
// TODO : Fix this, we need a more generic mechanism to map
// different datanode ID for different datanodes when we have lots of
// datanodes in the cluster.
conf.setStrings(ScmConfigKeys.OZONE_SCM_DATANODE_ID,
scmPath.toString() + "/datanode.id");
}
private void configVersionFile() throws IOException {
SCMStorage scmStore = new SCMStorage(conf);
scmStore.setClusterId(runID.toString());
scmStore.initialize();
}
private void configureHandler() {
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, this.ozoneEnabled);
if (!ozoneHandlerType.isPresent()) {
throw new IllegalArgumentException(
"The Ozone handler type must be specified.");
} else {
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
ozoneHandlerType.get());
}
}
private void configureTrace() {
if (enableTrace.isPresent()) {
conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY,
enableTrace.get());
GenericTestUtils.setRootLogLevel(Level.TRACE);
}
GenericTestUtils.setRootLogLevel(Level.INFO);
}
private void configureSCMheartbeat() {
if (hbSeconds.isPresent()) {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
hbSeconds.get());
} else {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
DEFAULT_HB_SECONDS);
}
if (hbProcessorInterval.isPresent()) {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
hbProcessorInterval.get());
} else {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
DEFAULT_PROCESSOR_MS);
}
}
}
}
void waitForHeartbeatProcessed() throws TimeoutException,
InterruptedException;
}

View File

@ -44,10 +44,10 @@ class RatisTestSuite implements Closeable {
static final int NUM_DATANODES = 3;
private final OzoneConfiguration conf;
private final MiniOzoneCluster cluster;
private final MiniOzoneClassicCluster cluster;
/**
* Create a {@link MiniOzoneCluster} for testing by setting
* Create a {@link MiniOzoneClassicCluster} for testing by setting
* OZONE_ENABLED = true,
* RATIS_ENABLED = true, and
* OZONE_HANDLER_TYPE_KEY = "distributed".
@ -61,7 +61,7 @@ public OzoneConfiguration getConf() {
return conf;
}
public MiniOzoneCluster getCluster() {
public MiniOzoneClassicCluster getCluster() {
return cluster;
}
@ -95,15 +95,12 @@ static void initRatisConf(RpcType rpc, Configuration conf) {
+ " = " + rpc.name());
}
static MiniOzoneCluster newMiniOzoneCluster(
static MiniOzoneClassicCluster newMiniOzoneCluster(
int numDatanodes, OzoneConfiguration conf) throws IOException {
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
final MiniOzoneClassicCluster cluster =
new MiniOzoneClassicCluster.Builder(conf)
.numDataNodes(numDatanodes)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
// cluster.getRatisManager().createPipeline("ratis0",
// cluster.getDataNodes().stream()
// .map(DataNode::getDatanodeId)
// .collect(Collectors.toList()));
return cluster;
}

View File

@ -55,7 +55,7 @@ public static void setup() throws Exception {
ozoneConf = new OzoneConfiguration();
ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
cluster = new MiniOzoneCluster.Builder(ozoneConf).numDataNodes(1)
cluster = new MiniOzoneClassicCluster.Builder(ozoneConf).numDataNodes(1)
.storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
StorageContainerLocationProtocolClientSideTranslatorPB client =

View File

@ -53,7 +53,7 @@
*/
public class TestMiniOzoneCluster {
private static MiniOzoneCluster cluster;
private static MiniOzoneClassicCluster cluster;
private static OzoneConfiguration conf;
private final static File TEST_ROOT = TestGenericTestUtils.getTestDir();
@ -83,8 +83,10 @@ public static void cleanup() {
@Test(timeout = 30000)
public void testStartMultipleDatanodes() throws Exception {
final int numberOfNodes = 3;
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(numberOfNodes)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
cluster = new MiniOzoneClassicCluster.Builder(conf)
.numDataNodes(numberOfNodes)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.build();
// make sure datanode.id file is correct
File idPath = new File(

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.ozone.scm.StorageContainerManager.StartupOption;
import org.apache.hadoop.ozone.scm.block.DeletedBlockLog;
import org.apache.hadoop.ozone.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -102,7 +101,7 @@ private void testRpcPermissionWithConf(
OzoneConfiguration ozoneConf, String fakeRemoteUsername,
boolean expectPermissionDenied) throws IOException {
MiniOzoneCluster cluster =
new MiniOzoneCluster.Builder(ozoneConf).numDataNodes(1)
new MiniOzoneClassicCluster.Builder(ozoneConf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
try {
@ -192,8 +191,8 @@ public void testBlockDeletionTransactions() throws Exception {
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
numKeys);
MiniOzoneCluster cluster =
new MiniOzoneCluster.Builder(conf).numDataNodes(1)
MiniOzoneClassicCluster cluster =
new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
try {
@ -267,7 +266,7 @@ public void testBlockDeletingThrottling() throws Exception {
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
numKeys);
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
MiniOzoneClassicCluster cluster = new MiniOzoneClassicCluster.Builder(conf)
.numDataNodes(1).setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.build();
@ -388,7 +387,7 @@ public void testSCMReinitialization() throws Exception {
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
//This will set the cluster id in the version file
MiniOzoneCluster cluster =
new MiniOzoneCluster.Builder(conf).numDataNodes(1)
new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
StartupOption.INIT.setClusterId("testClusterId");
// This will initialize SCM
@ -397,17 +396,4 @@ public void testSCMReinitialization() throws Exception {
Assert.assertEquals(OzoneConsts.NodeType.SCM, scmStore.getNodeType());
Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
}
@Test
public void testSCMInitializationFailure() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
final String path =
GenericTestUtils.getTempPath(UUID.randomUUID().toString());
Path scmPath = Paths.get(path, "scm-meta");
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
exception.expect(SCMException.class);
exception.expectMessage("SCM not initialized.");
StorageContainerManager.createSCM(null, conf);
}
}

View File

@ -55,11 +55,11 @@
*/
public class TestStorageContainerManagerHelper {
private final MiniOzoneCluster cluster;
private final MiniOzoneClassicCluster cluster;
private final Configuration conf;
private final StorageHandler storageHandler;
public TestStorageContainerManagerHelper(MiniOzoneCluster cluster,
public TestStorageContainerManagerHelper(MiniOzoneClassicCluster cluster,
Configuration conf) throws IOException {
this.cluster = cluster;
this.conf = conf;

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@ -58,7 +58,7 @@ public class TestOzoneRestClient {
@Rule
public ExpectedException thrown = ExpectedException.none();
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static OzoneClient ozClient = null;
private static ObjectStore store = null;
@ -75,7 +75,7 @@ public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode datanode = cluster.getDataNodes().get(0);
conf.set(OzoneConfigKeys.OZONE_CLIENT_PROTOCOL,

View File

@ -20,6 +20,7 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -38,11 +39,10 @@
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.ksm.protocolPB.
KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -70,11 +70,10 @@ public class TestOzoneRpcClient {
@Rule
public ExpectedException thrown = ExpectedException.none();
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static OzoneClient ozClient = null;
private static ObjectStore store = null;
private static KeySpaceManagerProtocolClientSideTranslatorPB
keySpaceManagerClient;
private static KeySpaceManager keySpaceManager;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@ -91,7 +90,7 @@ public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(5)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
conf.set("ozone.client.protocol",
"org.apache.hadoop.ozone.client.rpc.RpcClient");
@ -100,7 +99,7 @@ public static void init() throws Exception {
store = ozClient.getObjectStore();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
keySpaceManagerClient = cluster.createKeySpaceManagerClient();
keySpaceManager = cluster.getKeySpaceManager();
}
@Test
@ -388,7 +387,7 @@ private boolean verifyRatisReplication(String volumeName, String bucketName,
OzoneProtos.ReplicationType.valueOf(type.toString());
OzoneProtos.ReplicationFactor replicationFactor =
OzoneProtos.ReplicationFactor.valueOf(factor.getValue());
KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
KsmKeyInfo keyInfo = keySpaceManager.lookupKey(keyArgs);
for (KsmKeyLocationInfo info: keyInfo.getKeyLocationList()) {
Pipeline pipeline =
storageContainerLocationClient.getContainer(info.getContainerName());
@ -809,10 +808,6 @@ public static void shutdown() throws IOException {
storageContainerLocationClient.close();
}
if (keySpaceManagerClient != null) {
keySpaceManagerClient.close();
}
if (cluster != null) {
cluster.shutdown();
}

View File

@ -207,6 +207,7 @@ public static ContainerCommandRequestProto getWriteChunkRequest(
request.setCmdType(ContainerProtos.Type.WriteChunk);
request.setWriteChunk(writeRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid().toString());
return request.build();
}
@ -251,6 +252,7 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest(
request.setCmdType(ContainerProtos.Type.PutSmallFile);
request.setPutSmallFile(smallFileRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
return request.build();
}
@ -260,7 +262,7 @@ public static ContainerCommandRequestProto getReadSmallFileRequest(
throws Exception {
ContainerProtos.GetSmallFileRequestProto.Builder smallFileRequest =
ContainerProtos.GetSmallFileRequestProto.newBuilder();
Pipeline pipeline = Pipeline.getFromProtoBuf(putKey.getPipeline());
ContainerCommandRequestProto getKey = getKeyRequest(putKey);
smallFileRequest.setKey(getKey.getGetKey());
@ -269,6 +271,7 @@ public static ContainerCommandRequestProto getReadSmallFileRequest(
request.setCmdType(ContainerProtos.Type.GetSmallFile);
request.setGetSmallFile(smallFileRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
return request.build();
}
@ -288,7 +291,7 @@ public static ContainerCommandRequestProto getReadChunkRequest(
ContainerProtos.ReadChunkRequestProto.Builder readRequest =
ContainerProtos.ReadChunkRequestProto.newBuilder();
Pipeline pipeline = Pipeline.getFromProtoBuf(request.getPipeline());
readRequest.setPipeline(request.getPipeline());
readRequest.setKeyName(request.getKeyName());
@ -299,6 +302,7 @@ public static ContainerCommandRequestProto getReadChunkRequest(
newRequest.setCmdType(ContainerProtos.Type.ReadChunk);
newRequest.setReadChunk(readRequest);
newRequest.setTraceID(UUID.randomUUID().toString());
newRequest.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
return newRequest.build();
}
@ -316,7 +320,7 @@ public static ContainerCommandRequestProto getDeleteChunkRequest(
IOException, NoSuchAlgorithmException {
LOG.trace("deleteChunk key={} from pipeline={}",
writeRequest.getKeyName(), writeRequest.getPipeline());
Pipeline pipeline = Pipeline.getFromProtoBuf(writeRequest.getPipeline());
ContainerProtos.DeleteChunkRequestProto.Builder deleteRequest =
ContainerProtos.DeleteChunkRequestProto
.newBuilder();
@ -330,6 +334,7 @@ public static ContainerCommandRequestProto getDeleteChunkRequest(
request.setCmdType(ContainerProtos.Type.DeleteChunk);
request.setDeleteChunk(deleteRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
return request.build();
}
@ -340,7 +345,7 @@ public static ContainerCommandRequestProto getDeleteChunkRequest(
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getCreateContainerRequest(
String containerName) throws IOException {
String containerName, Pipeline pipeline) throws IOException {
LOG.trace("createContainer: {}", containerName);
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
@ -359,6 +364,7 @@ public static ContainerCommandRequestProto getCreateContainerRequest(
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setCreateContainer(createRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid().toString());
return request.build();
}
@ -388,10 +394,9 @@ public static ContainerCommandRequestProto getUpdateContainerRequest(
kvBuilder.setValue(metaData.get(keys[i]));
containerData.addMetadata(i, kvBuilder.build());
}
updateRequestBuilder.setPipeline(
ContainerTestHelper.createSingleNodePipeline(containerName)
.getProtobufMessage());
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
updateRequestBuilder.setPipeline(pipeline.getProtobufMessage());
updateRequestBuilder.setContainerData(containerData.build());
ContainerCommandRequestProto.Builder request =
@ -399,6 +404,7 @@ public static ContainerCommandRequestProto getUpdateContainerRequest(
request.setCmdType(ContainerProtos.Type.UpdateContainer);
request.setUpdateContainer(updateRequestBuilder.build());
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
return request.build();
}
/**
@ -432,6 +438,7 @@ public static ContainerCommandRequestProto getPutKeyRequest(
LOG.trace("putKey: {} to pipeline={}",
writeRequest.getKeyName(), writeRequest.getPipeline());
Pipeline pipeline = Pipeline.getFromProtoBuf(writeRequest.getPipeline());
ContainerProtos.PutKeyRequestProto.Builder putRequest =
ContainerProtos.PutKeyRequestProto.newBuilder();
@ -448,6 +455,7 @@ public static ContainerCommandRequestProto getPutKeyRequest(
request.setCmdType(ContainerProtos.Type.PutKey);
request.setPutKey(putRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
return request.build();
}
@ -461,6 +469,7 @@ public static ContainerCommandRequestProto getKeyRequest(
ContainerProtos.PutKeyRequestProto putKeyRequest) {
LOG.trace("getKey: name={} from pipeline={}",
putKeyRequest.getKeyData().getName(), putKeyRequest.getPipeline());
Pipeline pipeline = Pipeline.getFromProtoBuf(putKeyRequest.getPipeline());
ContainerProtos.GetKeyRequestProto.Builder getRequest =
ContainerProtos.GetKeyRequestProto.newBuilder();
@ -476,6 +485,7 @@ public static ContainerCommandRequestProto getKeyRequest(
request.setCmdType(ContainerProtos.Type.GetKey);
request.setGetKey(getRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
return request.build();
}
@ -503,7 +513,7 @@ public static ContainerCommandRequestProto getDeleteKeyRequest(
ContainerProtos.PutKeyRequestProto putKeyRequest) {
LOG.trace("deleteKey: name={} from pipeline={}",
putKeyRequest.getKeyData().getName(), putKeyRequest.getPipeline());
Pipeline pipeline = Pipeline.getFromProtoBuf(putKeyRequest.getPipeline());
ContainerProtos.DeleteKeyRequestProto.Builder delRequest =
ContainerProtos.DeleteKeyRequestProto.newBuilder();
delRequest.setPipeline(putKeyRequest.getPipeline());
@ -513,6 +523,7 @@ public static ContainerCommandRequestProto getDeleteKeyRequest(
request.setCmdType(ContainerProtos.Type.DeleteKey);
request.setDeleteKey(delRequest);
request.setTraceID(UUID.randomUUID().toString());
request.setDatanodeID(pipeline.getLeader().getDatanodeUuid());
return request.build();
}
@ -531,6 +542,7 @@ public static ContainerCommandRequestProto getCloseContainer(
ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
.Type.CloseContainer).setCloseContainer(closeRequest)
.setTraceID(UUID.randomUUID().toString())
.setDatanodeID(pipeline.getLeader().getDatanodeUuid())
.build();
return cmd;
@ -550,6 +562,7 @@ public static ContainerCommandRequestProto getRequestWithoutTraceId(
ContainerProtos.ContainerCommandRequestProto cmd =
ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
.Type.CloseContainer).setCloseContainer(closeRequest)
.setDatanodeID(pipeline.getLeader().getDatanodeUuid())
.build();
return cmd;
}
@ -570,6 +583,7 @@ public static ContainerCommandRequestProto getDeleteContainer(
.setCmdType(ContainerProtos.Type.DeleteContainer)
.setDeleteContainer(deleteRequest)
.setTraceID(UUID.randomUUID().toString())
.setDatanodeID(pipeline.getLeader().getDatanodeUuid())
.build();
}
}

View File

@ -86,7 +86,7 @@ public void testContainerMetrics() throws Exception {
// Create container
ContainerCommandRequestProto request = ContainerTestHelper
.getCreateContainerRequest(containerName);
.getCreateContainerRequest(containerName, pipeline);
ContainerCommandResponseProto response = client.sendCommand(request);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
Assert.assertEquals(ContainerProtos.Result.SUCCESS,

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
@ -58,7 +59,7 @@ public void testCreateOzoneContainer() throws Exception {
OzoneContainer container = null;
MiniOzoneCluster cluster = null;
try {
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
// We don't start Ozone Container via data node, we will do it
// independently in our test path.
@ -105,7 +106,7 @@ public void testOzoneContainerViaDataNode() throws Exception {
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
@ -208,7 +209,7 @@ public void testBothGetandPutSmallFile() throws Exception {
OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
String containerName = client.getPipeline().getContainerName();
@ -266,7 +267,7 @@ public void testCloseContainer() throws Exception {
OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
client.connect();
@ -356,7 +357,7 @@ public void testDeleteContainer() throws Exception {
OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
client.connect();
@ -471,7 +472,7 @@ public void testXcieverClientAsync() throws Exception {
OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
String containerName = client.getPipeline().getContainerName();
@ -492,7 +493,7 @@ public void testInvalidRequest() throws Exception {
OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setRandomContainerPort(false)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
client.connect();
@ -529,7 +530,8 @@ private static void createContainerForTesting(XceiverClientSpi client,
String containerName) throws Exception {
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerTestHelper.getCreateContainerRequest(containerName,
client.getPipeline());
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.RatisTestHelper;
@ -75,7 +75,8 @@ private static void runTest(
// create Ozone clusters
final OzoneConfiguration conf = newOzoneConfiguration();
RatisTestHelper.initRatisConf(rpc, conf);
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
final MiniOzoneClassicCluster cluster =
new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
.numDataNodes(numNodes)
.build();

View File

@ -20,7 +20,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.RatisTestHelper;
@ -74,7 +74,8 @@ private static void runTestRatisManager(RpcType rpc) throws Exception {
// create Ozone clusters
final OzoneConfiguration conf = newOzoneConfiguration();
RatisTestHelper.initRatisConf(rpc, conf);
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
final MiniOzoneClassicCluster cluster =
new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL)
.numDataNodes(5)
.build();

View File

@ -75,7 +75,8 @@ public void testPipeline() throws IOException {
channel = new EmbeddedChannel(new XceiverServerHandler(
new TestContainerDispatcher()));
ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerTestHelper.getCreateContainerRequest(containerName,
ContainerTestHelper.createSingleNodePipeline(containerName));
channel.writeInbound(request);
Assert.assertTrue(channel.finish());
ContainerCommandResponseProto response = channel.readOutbound();
@ -172,7 +173,8 @@ static void runTestClientServer(
client.connect();
final ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerTestHelper
.getCreateContainerRequest(containerName, pipeline);
Assert.assertNotNull(request.getTraceID());
ContainerCommandResponseProto response = client.sendCommand(request);
@ -208,7 +210,8 @@ public void testClientServerWithContainerDispatcher() throws Exception {
client.connect();
ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerTestHelper.getCreateContainerRequest(containerName,
pipeline);
ContainerCommandResponseProto response = client.sendCommand(request);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());

View File

@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
@ -54,7 +55,7 @@ public void setup() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
ksmManager = cluster.getKeySpaceManager();
}

View File

@ -17,7 +17,7 @@
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -59,7 +59,7 @@
*/
@RunWith(Parameterized.class)
public class TestKSMSQLCli {
private MiniOzoneCluster cluster = null;
private MiniOzoneClassicCluster cluster = null;
private StorageHandler storageHandler;
private UserArgs userArgs;
private OzoneConfiguration conf;
@ -104,7 +104,7 @@ public void setup() throws Exception {
conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
@ -94,7 +95,7 @@ public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
@ -75,7 +76,7 @@ public static void init() throws Exception {
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5);
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),

View File

@ -39,7 +39,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneAcl.OzoneACLRights;
import org.apache.hadoop.ozone.OzoneAcl.OzoneACLType;
@ -81,7 +81,7 @@ public class TestOzoneShell {
private static String url;
private static File baseDir;
private static OzoneConfiguration conf = null;
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static OzoneRestClient client = null;
private static Shell shell = null;
@ -114,7 +114,7 @@ public static void init()
shell = new Shell();
shell.setConf(conf);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode dataNode = cluster.getDataNodes().get(0);
final int port = dataNode.getInfoPort();

View File

@ -19,6 +19,7 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -51,7 +52,7 @@ public class TestAllocateContainer {
public static void init() throws Exception {
long datanodeCapacities = 3 * OzoneConsts.TB;
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3)
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(3)
.storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient =

View File

@ -19,7 +19,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -82,7 +82,7 @@ public TestContainerSQLCli(String type) {
private static SQLCLI cli;
private MiniOzoneCluster cluster;
private MiniOzoneClassicCluster cluster;
private OzoneConfiguration conf;
private StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@ -118,7 +118,7 @@ public void setup() throws Exception {
factor = OzoneProtos.ReplicationFactor.ONE;
type = OzoneProtos.ReplicationType.STAND_ALONE;
}
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(2)
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(2)
.storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient =

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -61,7 +62,7 @@ public static void init() throws Exception {
ozoneConfig = new OzoneConfiguration();
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
cluster = new MiniOzoneCluster.Builder(ozoneConfig).numDataNodes(1)
cluster = new MiniOzoneClassicCluster.Builder(ozoneConfig).numDataNodes(1)
.storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster

View File

@ -19,7 +19,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@ -58,7 +58,7 @@
public class TestSCMCli {
private static SCMCLI cli;
private static MiniOzoneCluster cluster;
private static MiniOzoneClassicCluster cluster;
private static OzoneConfiguration conf;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@ -78,7 +78,7 @@ public class TestSCMCli {
@BeforeClass
public static void setup() throws Exception {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3)
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(3)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
xceiverClientManager = new XceiverClientManager(conf);
storageContainerLocationClient =

View File

@ -21,6 +21,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -61,7 +62,7 @@ public class TestSCMMXBean {
public static void init() throws IOException, TimeoutException,
InterruptedException {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.build();

View File

@ -26,6 +26,7 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
@ -56,7 +57,7 @@ public void testContainerMetrics() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
try {
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.numDataNodes(nodeCount).build();

View File

@ -20,6 +20,7 @@
import com.google.common.cache.Cache;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -57,7 +58,7 @@ public class TestXceiverClientManager {
@BeforeClass
public static void init() throws IOException {
config = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(config)
cluster = new MiniOzoneClassicCluster.Builder(config)
.numDataNodes(3)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -63,7 +64,7 @@ public class TestXceiverClientMetrics {
@BeforeClass
public static void init() throws IOException {
config = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(config)
cluster = new MiniOzoneClassicCluster.Builder(config)
.numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster
@ -86,7 +87,7 @@ public void testMetrics() throws Exception {
XceiverClientSpi client = clientManager.acquireClient(pipeline);
ContainerCommandRequestProto request = ContainerTestHelper
.getCreateContainerRequest(containerName);
.getCreateContainerRequest(containerName, pipeline);
client.sendCommand(request);
MetricsRecordBuilder containerMetrics = getMetrics(

View File

@ -18,6 +18,7 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@ -51,7 +52,7 @@ public class TestContainerStateManager {
@Before
public void setup() throws IOException {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
xceiverClientManager = new XceiverClientManager(conf);
scm = cluster.getStorageContainerManager();

View File

@ -16,7 +16,7 @@
*/
package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@ -50,7 +50,7 @@
*/
public class TestQueryNode {
private static int numOfDatanodes = 5;
private MiniOzoneCluster cluster;
private MiniOzoneClassicCluster cluster;
private ContainerOperationClient scmClient;
@ -64,7 +64,7 @@ public void setUp() throws Exception {
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.numDataNodes(numOfDatanodes)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.build();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.tools;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
@ -55,7 +56,7 @@ public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
.numDataNodes(5).build();
}

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.web;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -47,7 +47,7 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
@Rule
public Timeout testTimeout = new Timeout(300000);
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static int port = 0;
/**
@ -64,7 +64,7 @@ public static void init() throws Exception {
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode dataNode = cluster.getDataNodes().get(0);
port = dataNode.getInfoPort();

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.web;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -43,7 +43,7 @@ public class TestLocalOzoneVolumes extends TestOzoneHelper {
@Rule
public Timeout testTimeout = new Timeout(300000);
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static int port = 0;
/**
@ -67,7 +67,7 @@ public static void init() throws Exception {
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL).build();
DataNode dataNode = cluster.getDataNodes().get(0);
port = dataNode.getInfoPort();

View File

@ -24,6 +24,7 @@
import static org.junit.Assert.*;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
import org.junit.AfterClass;
@ -60,7 +61,7 @@ public class TestOzoneRestWithMiniCluster {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
cluster.waitOzoneReady();
ozoneClient = cluster.createOzoneRestClient();

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.ozone.web;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -56,7 +56,7 @@ public class TestOzoneWebAccess {
@Rule
public Timeout testTimeout = new Timeout(300000);
private static MiniOzoneCluster cluster;
private static MiniOzoneClassicCluster cluster;
private static int port;
/**
@ -76,7 +76,7 @@ public static void init() throws Exception {
.getTempPath(TestOzoneWebAccess.class.getSimpleName());
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL).build();
DataNode dataNode = cluster.getDataNodes().get(0);
port = dataNode.getInfoPort();

View File

@ -19,7 +19,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -54,7 +54,7 @@ public class TestBuckets {
@Rule
public Timeout testTimeout = new Timeout(300000);
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static OzoneRestClient ozoneRestClient = null;
/**
@ -77,7 +77,7 @@ public static void init() throws IOException,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode dataNode = cluster.getDataNodes().get(0);
final int port = dataNode.getInfoPort();

View File

@ -28,7 +28,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -84,7 +84,7 @@ public class TestKeys {
@Rule
public Timeout testTimeout = new Timeout(300000);
private static MiniOzoneCluster ozoneCluster = null;
private static MiniOzoneClassicCluster ozoneCluster = null;
private static String path;
private static OzoneRestClient ozoneRestClient = null;
private static long currentTime;
@ -103,7 +103,7 @@ public static void init() throws Exception {
path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName());
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
ozoneCluster = new MiniOzoneCluster.Builder(conf)
ozoneCluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode dataNode = ozoneCluster.getDataNodes().get(0);
final int port = dataNode.getInfoPort();
@ -271,7 +271,7 @@ static void runTestPutKey(PutHelper helper) throws Exception {
}
private static void restartDatanode(
MiniOzoneCluster cluster, int datanodeIdx, OzoneRestClient client)
MiniOzoneClassicCluster cluster, int datanodeIdx, OzoneRestClient client)
throws IOException, OzoneException, URISyntaxException {
cluster.restartDataNode(datanodeIdx);
// refresh the datanode endpoint uri after datanode restart
@ -291,7 +291,7 @@ public void testPutAndGetKeyWithDnRestart() throws Exception {
}
static void runTestPutAndGetKeyWithDnRestart(
PutHelper helper, MiniOzoneCluster cluster) throws Exception {
PutHelper helper, MiniOzoneClassicCluster cluster) throws Exception {
String keyName = helper.putKey().getKeyName();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.ozone.web.client;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@ -50,7 +50,7 @@ public class TestKeysRatis {
@Rule
public Timeout testTimeout = new Timeout(300000);
private static RatisTestHelper.RatisTestSuite suite;
private static MiniOzoneCluster ozoneCluster = null;
private static MiniOzoneClassicCluster ozoneCluster = null;
static private String path;
private static OzoneRestClient ozoneRestClient = null;

View File

@ -44,7 +44,7 @@
import io.netty.handler.logging.LoggingHandler;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -85,7 +85,7 @@
public class TestOzoneClient {
private static Logger log = Logger.getLogger(TestOzoneClient.class);
private static int testVolumeCount = 5;
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static String endpoint = null;
@BeforeClass
@ -94,7 +94,7 @@ public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode dataNode = cluster.getDataNodes().get(0);
endpoint = String.format("http://localhost:%d", dataNode.getInfoPort());

View File

@ -22,7 +22,7 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@ -62,7 +62,7 @@
* Test Ozone Volumes Lifecycle.
*/
public class TestVolume {
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static OzoneRestClient ozoneRestClient = null;
/**
@ -87,7 +87,7 @@ public static void init() throws Exception {
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode dataNode = cluster.getDataNodes().get(0);
final int port = dataNode.getInfoPort();

View File

@ -20,7 +20,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
@ -41,7 +41,7 @@ public class TestVolumeRatis {
@Rule
public Timeout testTimeout = new Timeout(300000);
private static OzoneRestClient ozoneClient;
private static MiniOzoneCluster cluster;
private static MiniOzoneClassicCluster cluster;
@BeforeClass
public static void init() throws Exception {
@ -60,7 +60,7 @@ public static void init() throws Exception {
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3)
cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(3)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode dataNode = cluster.getDataNodes().get(0);
final int port = dataNode.getInfoPort();

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.OzoneException;
@ -51,14 +51,14 @@
* create, read, write, getFileStatus
*/
public class TestOzoneFileInterfaces {
private static MiniOzoneCluster cluster = null;
private static MiniOzoneClassicCluster cluster = null;
private static FileSystem fs;
private static StorageHandler storageHandler;
@BeforeClass
public static void init() throws IOException, OzoneException {
OzoneConfiguration conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf)
cluster = new MiniOzoneClassicCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageHandler =
new ObjectStoreHandler(conf).getStorageHandler();

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.fs.ozone.Constants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
@ -44,7 +44,7 @@
*/
class OzoneContract extends AbstractFSContract {
private static MiniOzoneCluster cluster;
private static MiniOzoneClassicCluster cluster;
private static StorageHandler storageHandler;
private static final String CONTRACT_XML = "contract/ozone.xml";
@ -70,7 +70,7 @@ public static void createCluster() throws IOException {
conf.addResource(CONTRACT_XML);
cluster =
new MiniOzoneCluster.Builder(conf).numDataNodes(5)
new MiniOzoneClassicCluster.Builder(conf).numDataNodes(5)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
cluster.waitClusterUp();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();