HDFS-11519. Ozone: Implement XceiverServerSpi and XceiverClientSpi using Ratis. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Anu Engineer 2017-04-03 12:40:06 -07:00 committed by Owen O'Malley
parent 3966f42895
commit 68da45a789
17 changed files with 729 additions and 55 deletions

View File

@ -113,6 +113,31 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-proto-shaded</artifactId>
</dependency>
<dependency>
<artifactId>ratis-common</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
<dependency>
<artifactId>ratis-client</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
<dependency>
<artifactId>ratis-server</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
<dependency>
<artifactId>ratis-netty</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
<dependency>
<artifactId>ratis-grpc</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
</dependencies>
<build>

View File

@ -35,6 +35,15 @@ public final class ScmConfigKeys {
public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
10000;
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
= false;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
= "dfs.container.ratis.rpc.type";
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
= "GRPC";
// TODO : this is copied from OzoneConsts, may need to move to a better place
public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB

View File

@ -52,6 +52,7 @@ public class XceiverClientManager {
private final Configuration conf;
private Cache<String, XceiverClientWithAccessInfo> openClient;
private final long staleThresholdMs;
private final boolean useRatis;
/**
* Creates a new XceiverClientManager.
@ -63,6 +64,9 @@ public XceiverClientManager(Configuration conf) {
this.staleThresholdMs = conf.getTimeDuration(
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
this.useRatis = conf.getBoolean(
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.conf = conf;
this.openClient = CacheBuilder.newBuilder()
.expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS)
@ -109,7 +113,9 @@ public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException {
return info.getXceiverClient();
} else {
// connection not found, create new, add reference and return
XceiverClientSpi xceiverClient = new XceiverClient(pipeline, conf);
final XceiverClientSpi xceiverClient = useRatis?
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
: new XceiverClient(pipeline, conf);
try {
xceiverClient.connect();
} catch (Exception e) {

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.client.ClientFactory;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
* The underlying RPC mechanism can be chosen via the constructor.
*/
public final class XceiverClientRatis implements XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
public static XceiverClientRatis newXceiverClientRatis(
Pipeline pipeline, Configuration ozoneConf) {
final String rpcType = ozoneConf.get(
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType));
}
private final Pipeline pipeline;
private final RaftClient client;
/** Constructs a client. */
XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
this.pipeline = pipeline;
final List<RaftPeer> peers = pipeline.getMachines().stream()
.map(dn -> dn.getXferAddr())
.map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
.collect(Collectors.toList());
final RaftProperties properties = new RaftProperties();
final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
properties, null));
client = RaftClient.newBuilder()
.setClientRpc(factory.newRaftClientRpc())
.setServers(peers)
.setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr()))
.setProperties(properties)
.build();
}
@Override
public Pipeline getPipeline() {
return pipeline;
}
@Override
public void connect() throws Exception {
// do nothing.
}
@Override
public void close() {
try {
client.close();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException {
LOG.debug("sendCommand {}", request);
final RaftClientReply reply = client.send(
() -> ShadedProtoUtil.asShadedByteString(request.toByteArray()));
LOG.debug("reply {}", reply);
Preconditions.checkState(reply.isSuccess());
return ContainerCommandResponseProto.parseFrom(
ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
}
}

View File

@ -155,4 +155,14 @@ public byte[] getData() {
return null;
}
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
.append("[");
datanodes.keySet().stream()
.forEach(id -> b.append(id.endsWith(leaderID)? "*" + id : id));
b.append("] container:").append(containerName);
return b.toString();
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.shaded.com.google.protobuf;
/** Utilities for the shaded protobuf in Ratis. */
public interface ShadedProtoUtil {
/**
* @param bytes
* @return the wrapped shaded {@link ByteString} (no coping).
*/
static ByteString asShadedByteString(byte[] bytes) {
return ByteString.wrap(bytes);
}
/**
* @param shaded
* @return a {@link com.google.protobuf.ByteString} (require coping).
*/
static com.google.protobuf.ByteString asByteString(ByteString shaded) {
return com.google.protobuf.ByteString.copyFrom(
shaded.asReadOnlyByteBuffer());
}
}

View File

@ -212,6 +212,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.scm.ScmConfigKeys;
/**
* This class contains constants for configuration keys used in Ozone.
@ -67,6 +68,21 @@ public final class OzoneConfigKeys {
"ozone.container.task.wait.seconds";
public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5;
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT;
public static final String DFS_CONTAINER_RATIS_CONF =
"dfs.container.ratis.conf";
public static final String DFS_CONTAINER_RATIS_DATANODE_ADDRESS =
"dfs.container.ratis.datanode.address";
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
"dfs.container.ratis.datanode.storage.dir";
/**
* There is no need to instantiate this class.
*/

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
import org.apache.ratis.statemachine.BaseStateMachine;
import org.apache.ratis.statemachine.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */
public class ContainerStateMachine extends BaseStateMachine {
static final Logger LOG = LoggerFactory.getLogger(
ContainerStateMachine.class);
private final SimpleStateMachineStorage storage
= new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
ContainerStateMachine(ContainerDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
@Override
public StateMachineStorage getStateMachineStorage() {
return storage;
}
@Override
public void initialize(
RaftPeerId id, RaftProperties properties, RaftStorage raftStorage)
throws IOException {
super.initialize(id, properties, raftStorage);
storage.init(raftStorage);
// TODO handle snapshots
}
@Override
public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
return dispatch(ShadedProtoUtil.asByteString(
request.getMessage().getContent()),
response -> new RaftClientReply(request,
() -> ShadedProtoUtil.asShadedByteString(response.toByteArray())));
}
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
final SMLogEntryProto logEntry = trx.getSMLogEntry().get();
return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()),
response ->
() -> ShadedProtoUtil.asShadedByteString(response.toByteArray())
);
}
private <T> CompletableFuture<T> dispatch(
ByteString requestBytes, Function<ContainerCommandResponseProto, T> f) {
final ContainerCommandResponseProto response;
try {
final ContainerCommandRequestProto request
= ContainerCommandRequestProto.parseFrom(requestBytes);
LOG.trace("dispatch {}", request);
response = dispatcher.dispatch(request);
LOG.trace("response {}", response);
} catch (IOException e) {
return completeExceptionally(e);
}
return CompletableFuture.completedFuture(f.apply(response));
}
static <T> CompletableFuture<T> completeExceptionally(Exception e) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* Creates a ratis server endpoint that acts as the communication layer for
* Ozone containers.
*/
public final class XceiverServerRatis implements XceiverServerSpi {
static RaftProperties newRaftProperties(
RpcType rpc, int port, String storageDir) {
final RaftProperties properties = new RaftProperties();
RaftServerConfigKeys.setStorageDir(properties, storageDir);
RaftConfigKeys.Rpc.setType(properties, rpc);
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port);
} else if (rpc == SupportedRpcType.NETTY) {
NettyConfigKeys.Server.setPort(properties, port);
}
return properties;
}
public static XceiverServerRatis newXceiverServerRatis(
Configuration ozoneConf, ContainerDispatcher dispatcher)
throws IOException {
final String id = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS);
final Collection<String> servers = ozoneConf.getStringCollection(
OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
final String storageDir = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
final String rpcType = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
return new XceiverServerRatis(id, servers, storageDir, dispatcher, rpc);
}
private final int port;
private final RaftServer server;
private XceiverServerRatis(
String id, Collection<String> servers, String storageDir,
ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
Preconditions.checkArgument(servers.contains(id),
"%s is not one of %s specified in %s",
id, servers, OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
final List<RaftPeer> peers = servers.stream()
.map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
.collect(Collectors.toList());
this.port = NetUtils.createSocketAddr(id).getPort();
this.server = RaftServer.newBuilder()
.setServerId(new RaftPeerId(id))
.setPeers(peers)
.setProperties(newRaftProperties(rpcType, port, storageDir))
.setStateMachine(new ContainerStateMachine(dispatcher))
.build();
}
@Override
public void start() throws IOException {
server.start();
}
@Override
public void stop() {
try {
server.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public int getIPCPort() {
return port;
}
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@ -86,7 +87,13 @@ public OzoneContainer(
manager.setKeyManager(this.keyManager);
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
server = new XceiverServer(this.ozoneConfig, this.dispatcher);
final boolean useRatis = ozoneConfig.getBoolean(
OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
server = useRatis?
XceiverServerRatis.newXceiverServerRatis(ozoneConfig, dispatcher)
: new XceiverServer(this.ozoneConfig, this.dispatcher);
}
/**

View File

@ -1609,7 +1609,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
dnConf.addResource(dnConfOverlays[i]);
}
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
setupDatanodeAddress(i, dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) {
String dirs = makeDataNodeDirs(i, storageTypes == null ?
null : storageTypes[i - curDatanodesNum]);
@ -2911,16 +2911,19 @@ public File getProvidedStorageDir(int dnIndex, int dirIndex) {
/**
* Get a storage directory for a datanode.
* For examples,
* <ol>
* <li><base directory>/data/data<2*dnIndex + 1></li>
* <li><base directory>/data/data<2*dnIndex + 2></li>
* <li><base directory>/data/dn0_data0</li>
* <li><base directory>/data/dn0_data1</li>
* <li><base directory>/data/dn1_data0</li>
* <li><base directory>/data/dn1_data1</li>
* </ol>
*
* @param dnIndex datanode index (starts from 0)
* @param dirIndex directory index.
* @return Storage directory
*/
public File getStorageDir(int dnIndex, int dirIndex) {
public static File getStorageDir(int dnIndex, int dirIndex) {
return new File(getBaseDirectory(), getStorageDirPath(dnIndex, dirIndex));
}
@ -2931,8 +2934,8 @@ public File getStorageDir(int dnIndex, int dirIndex) {
* @param dirIndex directory index.
* @return storage directory path
*/
private String getStorageDirPath(int dnIndex, int dirIndex) {
return "data/data" + (storagesPerDatanode * dnIndex + 1 + dirIndex);
private static String getStorageDirPath(int dnIndex, int dirIndex) {
return "data/dn" + dnIndex + "_data" + dirIndex;
}
/**
@ -3197,35 +3200,36 @@ public void setBlockRecoveryTimeout(long timeout) {
}
}
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
protected void setupDatanodeAddress(
int i, Configuration dnConf, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
if (setupHostsFile) {
String hostsFile = conf.get(DFS_HOSTS, "").trim();
String hostsFile = dnConf.get(DFS_HOSTS, "").trim();
if (hostsFile.length() == 0) {
throw new IOException("Parameter dfs.hosts is not setup in conf");
}
// Setup datanode in the include file, if it is defined in the conf
String address = "127.0.0.1:" + NetUtils.getFreeSocketPort();
if (checkDataNodeAddrConfig) {
conf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, address);
dnConf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, address);
} else {
conf.set(DFS_DATANODE_ADDRESS_KEY, address);
dnConf.set(DFS_DATANODE_ADDRESS_KEY, address);
}
addToFile(hostsFile, address);
LOG.info("Adding datanode " + address + " to hosts file " + hostsFile);
} else {
if (checkDataNodeAddrConfig) {
conf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
dnConf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
} else {
conf.set(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
dnConf.set(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
}
}
if (checkDataNodeAddrConfig) {
conf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
dnConf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
dnConf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
} else {
conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
dnConf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
dnConf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
}
}

View File

@ -117,7 +117,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf);
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
setupDatanodeAddress(i, dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) {
String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);

View File

@ -22,6 +22,7 @@
import com.google.common.base.Supplier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
@ -85,6 +86,33 @@ private MiniOzoneCluster(Builder builder, StorageContainerManager scm)
tempPath = Paths.get(builder.getPath(), builder.getRunID());
}
@Override
protected void setupDatanodeAddress(
int i, Configuration dnConf, boolean setupHostsFile,
boolean checkDnAddrConf) throws IOException {
super.setupDatanodeAddress(i, dnConf, setupHostsFile, checkDnAddrConf);
final boolean useRatis = dnConf.getBoolean(
OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
if (!useRatis) {
return;
}
final String[] ids = dnConf.getStrings(
OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
// TODO: use the i-th raft server as the i-th datanode address
// this only work for one Raft cluster
setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS,
ids[i]);
setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
getInstanceStorageDir(i, -1).getCanonicalPath());
}
static void setConf(int i, Configuration conf, String key, String value) {
conf.set(key, value);
LOG.info("dn{}: set {} = {}", i, key, value);
}
@Override
public void close() {
shutdown();

View File

@ -21,17 +21,22 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.ratis.rpc.RpcType;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
@ -42,11 +47,14 @@
import java.util.Random;
import java.util.UUID;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Helpers for container tests.
*/
public final class ContainerTestHelper {
public static final Logger LOG = LoggerFactory.getLogger(
ContainerTestHelper.class);
private static Random r = new Random();
/**
@ -64,19 +72,50 @@ private ContainerTestHelper() {
*/
public static Pipeline createSingleNodePipeline(String containerName) throws
IOException {
return createPipeline(containerName, 1);
}
public static DatanodeID createDatanodeID() throws IOException {
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
DatanodeID datanodeID = new DatanodeID(socket.getInetAddress()
.getHostAddress(), socket.getInetAddress().getHostName(),
UUID.randomUUID().toString(), port, port, port, port);
datanodeID.setContainerPort(port);
Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid());
pipeline.addMember(datanodeID);
pipeline.setContainerName(containerName);
socket.close();
return datanodeID;
}
/**
* Create a pipeline with single node replica.
*
* @return Pipeline with single node in it.
* @throws IOException
*/
public static Pipeline createPipeline(String containerName, int numNodes)
throws IOException {
Preconditions.checkArgument(numNodes >= 1);
final DatanodeID leader = createDatanodeID();
Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
pipeline.setContainerName(containerName);
pipeline.addMember(leader);
for(int i = 1; i < numNodes; i++) {
pipeline.addMember(createDatanodeID());
}
return pipeline;
}
public static void initRatisConf(
RpcType rpc, Pipeline pipeline, Configuration conf) {
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF,
pipeline.getMachines().stream()
.map(dn -> dn.getXferAddr())
.collect(Collectors.joining(",")));
}
/**
* Creates a ChunkInfo for testing.
*
@ -133,6 +172,8 @@ public static void setDataChecksum(ChunkInfo info, byte[] data)
public static ContainerCommandRequestProto getWriteChunkRequest(
Pipeline pipeline, String containerName, String keyName, int datalen)
throws IOException, NoSuchAlgorithmException {
LOG.trace("writeChunk {} (key={}) to pipeline=",
datalen, keyName, pipeline);
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
ContainerProtos.WriteChunkRequestProto
.newBuilder();
@ -225,6 +266,9 @@ public static ContainerCommandRequestProto getReadSmallFileRequest(
public static ContainerCommandRequestProto getReadChunkRequest(
ContainerProtos.WriteChunkRequestProto request)
throws IOException, NoSuchAlgorithmException {
LOG.trace("readChunk key={} from pipeline={}",
request.getKeyName(), request.getPipeline());
ContainerProtos.ReadChunkRequestProto.Builder readRequest =
ContainerProtos.ReadChunkRequestProto.newBuilder();
@ -252,6 +296,9 @@ public static ContainerCommandRequestProto getDeleteChunkRequest(
ContainerProtos.WriteChunkRequestProto writeRequest)
throws
IOException, NoSuchAlgorithmException {
LOG.trace("deleteChunk key={} from pipeline={}",
writeRequest.getKeyName(), writeRequest.getPipeline());
ContainerProtos.DeleteChunkRequestProto.Builder deleteRequest =
ContainerProtos.DeleteChunkRequestProto
.newBuilder();
@ -275,6 +322,8 @@ public static ContainerCommandRequestProto getDeleteChunkRequest(
*/
public static ContainerCommandRequestProto getCreateContainerRequest(
String containerName) throws IOException {
LOG.trace("createContainer: {}", containerName);
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
.newBuilder();
@ -358,6 +407,9 @@ public static ContainerCommandRequestProto getUpdateContainerRequest(
*/
public static ContainerCommandRequestProto getPutKeyRequest(
ContainerProtos.WriteChunkRequestProto writeRequest) {
LOG.trace("putKey: {} to pipeline={}",
writeRequest.getKeyName(), writeRequest.getPipeline());
ContainerProtos.PutKeyRequestProto.Builder putRequest =
ContainerProtos.PutKeyRequestProto.newBuilder();
@ -384,6 +436,9 @@ public static ContainerCommandRequestProto getPutKeyRequest(
*/
public static ContainerCommandRequestProto getKeyRequest(
ContainerProtos.PutKeyRequestProto putKeyRequest) {
LOG.trace("getKey: name={} from pipeline={}",
putKeyRequest.getKeyData().getName(), putKeyRequest.getPipeline());
ContainerProtos.GetKeyRequestProto.Builder getRequest =
ContainerProtos.GetKeyRequestProto.newBuilder();
ContainerProtos.KeyData.Builder keyData = ContainerProtos.KeyData
@ -422,6 +477,9 @@ public static void verifyGetKey(ContainerCommandRequestProto request,
*/
public static ContainerCommandRequestProto getDeleteKeyRequest(
ContainerProtos.PutKeyRequestProto putKeyRequest) {
LOG.trace("deleteKey: name={} from pipeline={}",
putKeyRequest.getKeyData().getName(), putKeyRequest.getPipeline());
ContainerProtos.DeleteKeyRequestProto.Builder delRequest =
ContainerProtos.DeleteKeyRequestProto.newBuilder();
delRequest.setPipeline(putKeyRequest.getPipeline());

View File

@ -25,7 +25,11 @@
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -88,20 +92,69 @@ public void testCreateOzoneContainer() throws Exception {
}
}
@Test
public void testOzoneContainerViaDataNodeRatisGrpc() throws Exception {
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 1);
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 3);
}
@Test
public void testOzoneContainerViaDataNodeRatisNetty() throws Exception {
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 1);
runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 3);
}
private static void runTestOzoneContainerViaDataNodeRatis(
RpcType rpc, int numNodes) throws Exception {
ContainerTestHelper.LOG.info("runTestOzoneContainerViaDataNodeRatis(rpc="
+ rpc + ", numNodes=" + numNodes);
final String containerName = OzoneUtils.getRequestID();
final Pipeline pipeline = ContainerTestHelper.createPipeline(
containerName, numNodes);
final OzoneConfiguration conf = initOzoneConfiguration(pipeline);
ContainerTestHelper.initRatisConf(rpc, pipeline, conf);
final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("local")
.numDataNodes(pipeline.getMachines().size())
.build();
cluster.waitOzoneReady();
final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis(
pipeline, conf);
try {
runTestOzoneContainerViaDataNode(containerName, pipeline, client);
} finally {
cluster.shutdown();
}
}
private static OzoneConfiguration initOzoneConfiguration(Pipeline pipeline) {
final OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
setOzoneLocalStorageRoot(conf);
return conf;
}
private static void setOzoneLocalStorageRoot(OzoneConfiguration conf) {
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
}
@Test
public void testOzoneContainerViaDataNode() throws Exception {
MiniOzoneCluster cluster = null;
XceiverClient client = null;
try {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
setOzoneLocalStorageRoot(conf);
// Start ozone container Via Datanode create.
@ -115,19 +168,32 @@ public void testOzoneContainerViaDataNode() throws Exception {
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
client = new XceiverClient(pipeline, conf);
XceiverClient client = new XceiverClient(pipeline, conf);
runTestOzoneContainerViaDataNode(containerName, pipeline, client);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
static void runTestOzoneContainerViaDataNode(
String containerName, Pipeline pipeline, XceiverClientSpi client)
throws Exception {
try {
client.connect();
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
pipeline.setContainerName(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Write Chunk
final String keyName = OzoneUtils.getRequestID();
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
keyName, 1024);
@ -204,9 +270,6 @@ public void testOzoneContainerViaDataNode() throws Exception {
if (client != null) {
client.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}

View File

@ -22,29 +22,43 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientRatis;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.rpc.RpcType;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
import static org.mockito.Mockito.mock;
/**
* Test Containers.
*/
public class TestContainerServer {
static final String TEST_DIR
= GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
@Test
public void testPipeline() throws IOException {
@ -68,33 +82,87 @@ public void testPipeline() throws IOException {
@Test
public void testClientServer() throws Exception {
XceiverServer server = null;
XceiverClient client = null;
runTestClientServer(1,
(pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort()),
XceiverClient::new,
(dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher()));
}
@FunctionalInterface
interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
OUT apply(LEFT left, RIGHT right) throws THROWABLE;
}
@Test
public void testClientServerRatisNetty() throws Exception {
runTestClientServerRatis(NETTY, 1);
runTestClientServerRatis(NETTY, 3);
}
@Test
public void testClientServerRatisGrpc() throws Exception {
runTestClientServerRatis(GRPC, 1);
runTestClientServerRatis(GRPC, 3);
}
static XceiverServerRatis newXceiverServerRatis(
DatanodeID dn, OzoneConfiguration conf) throws IOException {
final String id = dn.getXferAddr();
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, id);
final String dir = TEST_DIR + id.replace(':', '_');
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = new TestContainerDispatcher();
return XceiverServerRatis.newXceiverServerRatis(conf, dispatcher);
}
static void runTestClientServerRatis(RpcType rpc, int numNodes)
throws Exception {
runTestClientServer(numNodes,
(pipeline, conf) -> ContainerTestHelper.initRatisConf(
rpc, pipeline, conf),
XceiverClientRatis::newXceiverClientRatis,
TestContainerServer::newXceiverServerRatis);
}
static void runTestClientServer(
int numDatanodes,
BiConsumer<Pipeline, OzoneConfiguration> initConf,
CheckedBiFunction<Pipeline, OzoneConfiguration, XceiverClientSpi,
IOException> createClient,
CheckedBiFunction<DatanodeID, OzoneConfiguration, XceiverServerSpi,
IOException> createServer)
throws Exception {
final List<XceiverServerSpi> servers = new ArrayList<>();
XceiverClientSpi client = null;
String containerName = OzoneUtils.getRequestID();
try {
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(
containerName);
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
final Pipeline pipeline = ContainerTestHelper.createPipeline(
containerName, numDatanodes);
final OzoneConfiguration conf = new OzoneConfiguration();
initConf.accept(pipeline, conf);
server = new XceiverServer(conf, new TestContainerDispatcher());
client = new XceiverClient(pipeline, conf);
for(DatanodeID dn : pipeline.getMachines()) {
final XceiverServerSpi s = createServer.apply(dn, conf);
servers.add(s);
s.start();
}
server.start();
client = createClient.apply(pipeline, conf);
client.connect();
ContainerCommandRequestProto request =
final ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
Assert.assertNotNull(request.getTraceID());
ContainerCommandResponseProto response = client.sendCommand(request);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
Assert.assertEquals(request.getTraceID(), response.getTraceID());
} finally {
if (client != null) {
client.close();
}
if (server != null) {
server.stop();
}
servers.stream().forEach(XceiverServerSpi::stop);
}
}