HDDS-1865. Use "ozone.network.topology.aware.read" to control both RPC client and server side logic (#1184)

This commit is contained in:
Sammi Chen 2019-08-08 07:59:13 +08:00 committed by Xiaoyu Yao
parent 3cc0ace203
commit 8f9245bc2d
16 changed files with 104 additions and 132 deletions

View File

@ -99,9 +99,9 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.channels = new HashMap<>();
this.asyncStubs = new HashMap<>();
this.topologyAwareRead = Boolean.parseBoolean(config.get(
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
this.topologyAwareRead = config.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
}
/**

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@ -102,9 +103,9 @@ public void onRemoval(
}
}
}).build();
topologyAwareRead = Boolean.parseBoolean(conf.get(
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
topologyAwareRead = conf.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
}
@VisibleForTesting

View File

@ -372,10 +372,6 @@ public final class ScmConfigKeys {
"ozone.scm.network.topology.schema.file";
public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT =
"network-topology-default.xml";
public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED =
"dfs.network.topology.aware.read.enable";
public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT =
"false";
public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled";
public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true;

View File

@ -1,4 +1,4 @@
/**
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -29,7 +29,7 @@
import java.util.concurrent.TimeUnit;
/**
/**
* This class contains constants for configuration keys used in Ozone.
*/
@InterfaceAudience.Public
@ -447,6 +447,10 @@ public final class OzoneConfigKeys {
OZONE_FREON_HTTP_KERBEROS_KEYTAB_FILE_KEY =
"ozone.freon.http.kerberos.keytab";
public static final String OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY =
"ozone.network.topology.aware.read";
public static final boolean OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT = false;
/**
* There is no need to instantiate this class.
*/

View File

@ -2363,7 +2363,7 @@
</description>
</property>
<property>
<name>dfs.network.topology.aware.read.enable</name>
<name>ozone.network.topology.aware.read</name>
<value>false</value>
<tag>OZONE, PERFORMANCE</tag>
<description>

View File

@ -52,6 +52,8 @@ public static ContainerPlacementPolicy getPolicy(Configuration conf,
try {
constructor = placementClass.getDeclaredConstructor(NodeManager.class,
Configuration.class, NetworkTopology.class, boolean.class);
LOG.info("Create container placement policy of type " +
placementClass.getCanonicalName());
} catch (NoSuchMethodException e) {
String msg = "Failed to find constructor(NodeManager, Configuration, " +
"NetworkTopology, boolean) for class " +

View File

@ -97,7 +97,10 @@ public List<DatanodeDetails> chooseDatanodes(
int datanodeCount = networkTopology.getNumOfLeafNode(NetConstants.ROOT);
int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
if (datanodeCount < nodesRequired + excludedNodesCount) {
throw new SCMException("No enough datanodes to choose.", null);
throw new SCMException("No enough datanodes to choose. " +
"TotalNode = " + datanodeCount +
"RequiredNode = " + nodesRequired +
"ExcludedNode = " + excludedNodesCount, null);
}
List<DatanodeDetails> mutableFavoredNodes = favoredNodes;
// sanity check of favoredNodes

View File

@ -134,6 +134,7 @@ public class RpcClient implements ClientProtocol {
private final int maxRetryCount;
private final long retryInterval;
private Text dtService;
private final boolean topologyAwareReadEnabled;
/**
* Creates RpcClient instance with the given configuration.
@ -228,6 +229,9 @@ public RpcClient(Configuration conf) throws IOException {
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
ByteStringHelper.init(isUnsafeByteOperationsEnabled);
topologyAwareReadEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
}
private InetSocketAddress getScmAddressForClient() throws IOException {
@ -658,6 +662,7 @@ public OzoneInputStream getKey(
.setBucketName(bucketName)
.setKeyName(keyName)
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
return createInputStream(keyInfo);
@ -721,6 +726,7 @@ public OzoneKeyDetails getKeyDetails(
.setBucketName(bucketName)
.setKeyName(keyName)
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
@ -981,6 +987,7 @@ public OzoneInputStream readFile(String volumeName, String bucketName,
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupFile(keyArgs);
return createInputStream(keyInfo);

View File

@ -46,6 +46,7 @@ public final class OmKeyArgs implements Auditable {
private final int multipartUploadPartNumber;
private Map<String, String> metadata;
private boolean refreshPipeline;
private boolean sortDatanodesInPipeline;
private List<OzoneAcl> acls;
@SuppressWarnings("parameternumber")
@ -54,7 +55,7 @@ private OmKeyArgs(String volumeName, String bucketName, String keyName,
List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
String uploadID, int partNumber,
Map<String, String> metadataMap, boolean refreshPipeline,
List<OzoneAcl> acls) {
List<OzoneAcl> acls, boolean sortDatanode) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
@ -68,6 +69,7 @@ private OmKeyArgs(String volumeName, String bucketName, String keyName,
this.metadata = metadataMap;
this.refreshPipeline = refreshPipeline;
this.acls = acls;
this.sortDatanodesInPipeline = sortDatanode;
}
public boolean getIsMultipartKey() {
@ -134,6 +136,10 @@ public boolean getRefreshPipeline() {
return refreshPipeline;
}
public boolean getSortDatanodes() {
return sortDatanodesInPipeline;
}
@Override
public Map<String, String> toAuditMap() {
Map<String, String> auditMap = new LinkedHashMap<>();
@ -174,6 +180,7 @@ public static class Builder {
private int multipartUploadPartNumber;
private Map<String, String> metadata = new HashMap<>();
private boolean refreshPipeline;
private boolean sortDatanodesInPipeline;
private List<OzoneAcl> acls;
public Builder setVolumeName(String volume) {
@ -246,10 +253,16 @@ public Builder setRefreshPipeline(boolean refresh) {
return this;
}
public Builder setSortDatanodesInPipeline(boolean sort) {
this.sortDatanodesInPipeline = sort;
return this;
}
public OmKeyArgs build() {
return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
factor, locationInfoList, isMultipartKey, multipartUploadID,
multipartUploadPartNumber, metadata, refreshPipeline, acls);
multipartUploadPartNumber, metadata, refreshPipeline, acls,
sortDatanodesInPipeline);
}
}

View File

@ -787,7 +787,9 @@ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize()).build();
.setDataSize(args.getDataSize())
.setSortDatanodes(args.getSortDatanodes())
.build();
req.setKeyArgs(keyArgs);
OMRequest omRequest = createOMRequest(Type.LookupKey)
@ -1315,6 +1317,7 @@ public OmKeyInfo lookupFile(OmKeyArgs args)
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setSortDatanodes(args.getSortDatanodes())
.build();
LookupFileRequest lookupFileRequest = LookupFileRequest.newBuilder()
.setKeyArgs(keyArgs)

View File

@ -615,6 +615,7 @@ message KeyArgs {
// value is used in setting creation/modification time depending on the
// request type.
optional uint64 modificationTime = 13;
optional bool sortDatanodes = 14;
}
message KeyLocation {

View File

@ -17,15 +17,18 @@
package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@ -44,10 +47,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
@ -91,11 +97,16 @@ private void init() throws Exception {
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
1, TimeUnit.SECONDS);
conf.setBoolean(
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, true);
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
conf.setQuietMode(false);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
StaticMapping.addNodeToRack(NetUtils.normalizeHostNames(
Collections.singleton(HddsUtils.getHostName(conf))).get(0),
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(6).build();
.setNumDatanodes(10).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);

View File

@ -42,7 +42,6 @@
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@ -101,7 +100,6 @@
import org.apache.commons.lang3.RandomUtils;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
@ -719,81 +717,6 @@ public void testValidateBlockLengthWithCommitKey() throws IOException {
Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize());
}
/**
* Tests get the information of key with network topology awareness enabled.
* @throws IOException
*/
@Test
public void testGetKeyAndFileWithNetworkTopology() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
String value = "sample value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
String keyName = UUID.randomUUID().toString();
// Write data into a key
OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.RATIS,
THREE, new HashMap<>());
out.write(value.getBytes());
out.close();
// Since the rpc client is outside of cluster, then getFirstNode should be
// equal to getClosestNode.
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
builder.setVolumeName(volumeName).setBucketName(bucketName)
.setKeyName(keyName).setRefreshPipeline(true);
// read key with topology aware read enabled(default)
try {
OzoneInputStream is = bucket.readKey(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading key should success");
}
// read file with topology aware read enabled(default)
try {
OzoneInputStream is = bucket.readFile(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading file should success");
}
// read key with topology aware read disabled
Configuration conf = cluster.getConf();
conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false");
OzoneClient newClient = OzoneClientFactory.getRpcClient(conf);
ObjectStore newStore = newClient.getObjectStore();
OzoneBucket newBucket =
newStore.getVolume(volumeName).getBucket(bucketName);
try {
OzoneInputStream is = newBucket.readKey(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading key should success");
}
// read file with topology aware read disabled
try {
OzoneInputStream is = newBucket.readFile(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading file should success");
}
}
@Test
public void testPutKeyRatisOneNode()
throws IOException, OzoneException {

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
@ -63,6 +64,8 @@ public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
true);
startCluster(conf);
}
@ -91,11 +94,11 @@ public void testGetKeyAndFileWithNetworkTopology() throws IOException {
String keyName = UUID.randomUUID().toString();
// Write data into a key
OzoneOutputStream out = bucket.createKey(keyName,
try (OzoneOutputStream out = bucket.createKey(keyName,
value.getBytes().length, ReplicationType.RATIS,
THREE, new HashMap<>());
out.write(value.getBytes());
out.close();
THREE, new HashMap<>())) {
out.write(value.getBytes());
}
// Since the rpc client is outside of cluster, then getFirstNode should be
// equal to getClosestNode.
@ -103,48 +106,47 @@ public void testGetKeyAndFileWithNetworkTopology() throws IOException {
builder.setVolumeName(volumeName).setBucketName(bucketName)
.setKeyName(keyName).setRefreshPipeline(true);
// read key with topology aware read enabled(default)
try {
OzoneInputStream is = bucket.readKey(keyName);
// read key with topology aware read enabled
try (OzoneInputStream is = bucket.readKey(keyName)) {
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading key should success");
fail("Read key should succeed");
}
// read file with topology aware read enabled(default)
try {
OzoneInputStream is = bucket.readFile(keyName);
// read file with topology aware read enabled
try (OzoneInputStream is = bucket.readKey(keyName)) {
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading file should success");
fail("Read file should succeed");
}
// read key with topology aware read disabled
conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false");
OzoneClient newClient = OzoneClientFactory.getRpcClient(conf);
ObjectStore newStore = newClient.getObjectStore();
OzoneBucket newBucket =
newStore.getVolume(volumeName).getBucket(bucketName);
try {
OzoneInputStream is = newBucket.readKey(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading key should success");
}
// read file with topology aware read disabled
conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
false);
try (OzoneClient newClient = OzoneClientFactory.getRpcClient(conf)) {
ObjectStore newStore = newClient.getObjectStore();
OzoneBucket newBucket =
newStore.getVolume(volumeName).getBucket(bucketName);
try (OzoneInputStream is = newBucket.readKey(keyName)) {
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Read key should succeed");
}
try {
OzoneInputStream is = newBucket.readFile(keyName);
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Reading file should success");
// read file with topology aware read disabled
try (OzoneInputStream is = newBucket.readFile(keyName)) {
byte[] b = new byte[value.getBytes().length];
is.read(b);
Assert.assertTrue(Arrays.equals(b, value.getBytes()));
} catch (OzoneChecksumException e) {
fail("Read file should succeed");
}
}
}
}

View File

@ -694,7 +694,9 @@ public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress)
});
}
}
sortDatanodeInPipeline(value, clientAddress);
if (args.getSortDatanodes()) {
sortDatanodeInPipeline(value, clientAddress);
}
return value;
} catch (IOException ex) {
LOG.debug("Get key failed for volume:{} bucket:{} key:{}",
@ -1916,7 +1918,9 @@ public OmKeyInfo lookupFile(OmKeyArgs args, String clientAddress)
try {
OzoneFileStatus fileStatus = getFileStatus(args);
if (fileStatus.isFile()) {
sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress);
if (args.getSortDatanodes()) {
sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress);
}
return fileStatus.getKeyInfo();
}
//if key is not of type file or if key is not found we throw an exception

View File

@ -606,6 +606,7 @@ private LookupKeyResponse lookupKey(LookupKeyRequest request)
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setRefreshPipeline(true)
.setSortDatanodesInPipeline(keyArgs.getSortDatanodes())
.build();
OmKeyInfo keyInfo = impl.lookupKey(omKeyArgs);
resp.setKeyInfo(keyInfo.getProtobuf());
@ -1053,6 +1054,7 @@ private LookupFileResponse lookupFile(
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setSortDatanodesInPipeline(keyArgs.getSortDatanodes())
.build();
return LookupFileResponse.newBuilder()
.setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf())