From 8f9245bc2d2771270488f151b1a41c656bdafc68 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Thu, 8 Aug 2019 07:59:13 +0800 Subject: [PATCH] HDDS-1865. Use "ozone.network.topology.aware.read" to control both RPC client and server side logic (#1184) --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 6 +- .../hadoop/hdds/scm/XceiverClientManager.java | 7 +- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 - .../apache/hadoop/ozone/OzoneConfigKeys.java | 8 +- .../src/main/resources/ozone-default.xml | 2 +- .../ContainerPlacementPolicyFactory.java | 2 + .../SCMContainerPlacementRackAware.java | 5 +- .../hadoop/ozone/client/rpc/RpcClient.java | 7 ++ .../hadoop/ozone/om/helpers/OmKeyArgs.java | 17 +++- ...ManagerProtocolClientSideTranslatorPB.java | 5 +- .../src/main/proto/OzoneManagerProtocol.proto | 1 + .../rpc/TestFailureHandlingByClient.java | 17 +++- .../rpc/TestOzoneRpcClientAbstract.java | 77 ------------------- .../rpc/TestOzoneRpcClientWithRatis.java | 68 ++++++++-------- .../hadoop/ozone/om/KeyManagerImpl.java | 8 +- .../OzoneManagerRequestHandler.java | 2 + 16 files changed, 104 insertions(+), 132 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 9f99ab58ea..b51b221f4d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -99,9 +99,9 @@ public class XceiverClientGrpc extends XceiverClientSpi { this.metrics = XceiverClientManager.getXceiverClientMetrics(); this.channels = new HashMap<>(); this.asyncStubs = new HashMap<>(); - this.topologyAwareRead = Boolean.parseBoolean(config.get( - ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, - ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT)); + this.topologyAwareRead = config.getBoolean( + OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, + OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); } /** diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index 57799aab4d..57c567e5e9 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -27,6 +27,7 @@ import com.google.common.cache.RemovalNotification; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -102,9 +103,9 @@ public class XceiverClientManager implements Closeable { } } }).build(); - topologyAwareRead = Boolean.parseBoolean(conf.get( - ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, - ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT)); + topologyAwareRead = conf.getBoolean( + OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, + OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); } @VisibleForTesting diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 212966e0ff..55e0c53944 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -372,10 +372,6 @@ public final class ScmConfigKeys { "ozone.scm.network.topology.schema.file"; public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT = "network-topology-default.xml"; - public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED = - "dfs.network.topology.aware.read.enable"; - public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT = - "false"; public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled"; public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 7e716f9778..f8b2973cf6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -1,4 +1,4 @@ - /** +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,7 +29,7 @@ import org.apache.ratis.util.TimeDuration; import java.util.concurrent.TimeUnit; - /** +/** * This class contains constants for configuration keys used in Ozone. */ @InterfaceAudience.Public @@ -447,6 +447,10 @@ public final class OzoneConfigKeys { OZONE_FREON_HTTP_KERBEROS_KEYTAB_FILE_KEY = "ozone.freon.http.kerberos.keytab"; + public static final String OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY = + "ozone.network.topology.aware.read"; + public static final boolean OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT = false; + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index abd36041aa..409cc72c8f 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -2363,7 +2363,7 @@ - dfs.network.topology.aware.read.enable + ozone.network.topology.aware.read false OZONE, PERFORMANCE diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java index 6b1e5e2a80..b337f4e995 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java @@ -52,6 +52,8 @@ public final class ContainerPlacementPolicyFactory { try { constructor = placementClass.getDeclaredConstructor(NodeManager.class, Configuration.class, NetworkTopology.class, boolean.class); + LOG.info("Create container placement policy of type " + + placementClass.getCanonicalName()); } catch (NoSuchMethodException e) { String msg = "Failed to find constructor(NodeManager, Configuration, " + "NetworkTopology, boolean) for class " + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java index e126f27c1f..14d1ffb58c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java @@ -97,7 +97,10 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy { int datanodeCount = networkTopology.getNumOfLeafNode(NetConstants.ROOT); int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size(); if (datanodeCount < nodesRequired + excludedNodesCount) { - throw new SCMException("No enough datanodes to choose.", null); + throw new SCMException("No enough datanodes to choose. " + + "TotalNode = " + datanodeCount + + "RequiredNode = " + nodesRequired + + "ExcludedNode = " + excludedNodesCount, null); } List mutableFavoredNodes = favoredNodes; // sanity check of favoredNodes diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 338e216e83..bd41adac04 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -134,6 +134,7 @@ public class RpcClient implements ClientProtocol { private final int maxRetryCount; private final long retryInterval; private Text dtService; + private final boolean topologyAwareReadEnabled; /** * Creates RpcClient instance with the given configuration. @@ -228,6 +229,9 @@ public class RpcClient implements ClientProtocol { OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); ByteStringHelper.init(isUnsafeByteOperationsEnabled); + topologyAwareReadEnabled = conf.getBoolean( + OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, + OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); } private InetSocketAddress getScmAddressForClient() throws IOException { @@ -658,6 +662,7 @@ public class RpcClient implements ClientProtocol { .setBucketName(bucketName) .setKeyName(keyName) .setRefreshPipeline(true) + .setSortDatanodesInPipeline(topologyAwareReadEnabled) .build(); OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); return createInputStream(keyInfo); @@ -721,6 +726,7 @@ public class RpcClient implements ClientProtocol { .setBucketName(bucketName) .setKeyName(keyName) .setRefreshPipeline(true) + .setSortDatanodesInPipeline(topologyAwareReadEnabled) .build(); OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); @@ -981,6 +987,7 @@ public class RpcClient implements ClientProtocol { .setVolumeName(volumeName) .setBucketName(bucketName) .setKeyName(keyName) + .setSortDatanodesInPipeline(topologyAwareReadEnabled) .build(); OmKeyInfo keyInfo = ozoneManagerClient.lookupFile(keyArgs); return createInputStream(keyInfo); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java index de07d08fb2..6bca3aaa8e 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java @@ -46,6 +46,7 @@ public final class OmKeyArgs implements Auditable { private final int multipartUploadPartNumber; private Map metadata; private boolean refreshPipeline; + private boolean sortDatanodesInPipeline; private List acls; @SuppressWarnings("parameternumber") @@ -54,7 +55,7 @@ public final class OmKeyArgs implements Auditable { List locationInfoList, boolean isMultipart, String uploadID, int partNumber, Map metadataMap, boolean refreshPipeline, - List acls) { + List acls, boolean sortDatanode) { this.volumeName = volumeName; this.bucketName = bucketName; this.keyName = keyName; @@ -68,6 +69,7 @@ public final class OmKeyArgs implements Auditable { this.metadata = metadataMap; this.refreshPipeline = refreshPipeline; this.acls = acls; + this.sortDatanodesInPipeline = sortDatanode; } public boolean getIsMultipartKey() { @@ -134,6 +136,10 @@ public final class OmKeyArgs implements Auditable { return refreshPipeline; } + public boolean getSortDatanodes() { + return sortDatanodesInPipeline; + } + @Override public Map toAuditMap() { Map auditMap = new LinkedHashMap<>(); @@ -174,6 +180,7 @@ public final class OmKeyArgs implements Auditable { private int multipartUploadPartNumber; private Map metadata = new HashMap<>(); private boolean refreshPipeline; + private boolean sortDatanodesInPipeline; private List acls; public Builder setVolumeName(String volume) { @@ -246,10 +253,16 @@ public final class OmKeyArgs implements Auditable { return this; } + public Builder setSortDatanodesInPipeline(boolean sort) { + this.sortDatanodesInPipeline = sort; + return this; + } + public OmKeyArgs build() { return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type, factor, locationInfoList, isMultipartKey, multipartUploadID, - multipartUploadPartNumber, metadata, refreshPipeline, acls); + multipartUploadPartNumber, metadata, refreshPipeline, acls, + sortDatanodesInPipeline); } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 094e6895a2..f76154871e 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -787,7 +787,9 @@ public final class OzoneManagerProtocolClientSideTranslatorPB .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) .setKeyName(args.getKeyName()) - .setDataSize(args.getDataSize()).build(); + .setDataSize(args.getDataSize()) + .setSortDatanodes(args.getSortDatanodes()) + .build(); req.setKeyArgs(keyArgs); OMRequest omRequest = createOMRequest(Type.LookupKey) @@ -1315,6 +1317,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB .setVolumeName(args.getVolumeName()) .setBucketName(args.getBucketName()) .setKeyName(args.getKeyName()) + .setSortDatanodes(args.getSortDatanodes()) .build(); LookupFileRequest lookupFileRequest = LookupFileRequest.newBuilder() .setKeyArgs(keyArgs) diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 68e9f267ab..43e2548fb4 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -615,6 +615,7 @@ message KeyArgs { // value is used in setting creation/modification time depending on the // request type. optional uint64 modificationTime = 13; + optional bool sortDatanodes = 14; } message KeyLocation { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index 7ce41a9e89..3c7a25e18b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -17,15 +17,18 @@ package org.apache.hadoop.ozone.client.rpc; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -44,10 +47,13 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic + .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; @@ -91,11 +97,16 @@ public class TestFailureHandlingByClient { OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, 1, TimeUnit.SECONDS); conf.setBoolean( - ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, true); + OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true); conf.setQuietMode(false); + conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + StaticMapping.class, DNSToSwitchMapping.class); + StaticMapping.addNodeToRack(NetUtils.normalizeHostNames( + Collections.singleton(HddsUtils.getHostName(conf))).get(0), + "/rack1"); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(6).build(); + .setNumDatanodes(10).build(); cluster.waitForClusterToBeReady(); //the easiest way to create an open container is creating a key client = OzoneClientFactory.getClient(conf); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 4e426ba0db..eb2d04897d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -101,7 +100,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; -import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE; import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE; import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS; import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT; @@ -719,81 +717,6 @@ public abstract class TestOzoneRpcClientAbstract { Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize()); } - /** - * Tests get the information of key with network topology awareness enabled. - * @throws IOException - */ - @Test - public void testGetKeyAndFileWithNetworkTopology() throws IOException { - String volumeName = UUID.randomUUID().toString(); - String bucketName = UUID.randomUUID().toString(); - - String value = "sample value"; - store.createVolume(volumeName); - OzoneVolume volume = store.getVolume(volumeName); - volume.createBucket(bucketName); - OzoneBucket bucket = volume.getBucket(bucketName); - String keyName = UUID.randomUUID().toString(); - - // Write data into a key - OzoneOutputStream out = bucket.createKey(keyName, - value.getBytes().length, ReplicationType.RATIS, - THREE, new HashMap<>()); - out.write(value.getBytes()); - out.close(); - - // Since the rpc client is outside of cluster, then getFirstNode should be - // equal to getClosestNode. - OmKeyArgs.Builder builder = new OmKeyArgs.Builder(); - builder.setVolumeName(volumeName).setBucketName(bucketName) - .setKeyName(keyName).setRefreshPipeline(true); - - // read key with topology aware read enabled(default) - try { - OzoneInputStream is = bucket.readKey(keyName); - byte[] b = new byte[value.getBytes().length]; - is.read(b); - Assert.assertTrue(Arrays.equals(b, value.getBytes())); - } catch (OzoneChecksumException e) { - fail("Reading key should success"); - } - // read file with topology aware read enabled(default) - try { - OzoneInputStream is = bucket.readFile(keyName); - byte[] b = new byte[value.getBytes().length]; - is.read(b); - Assert.assertTrue(Arrays.equals(b, value.getBytes())); - } catch (OzoneChecksumException e) { - fail("Reading file should success"); - } - - // read key with topology aware read disabled - Configuration conf = cluster.getConf(); - conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false"); - OzoneClient newClient = OzoneClientFactory.getRpcClient(conf); - ObjectStore newStore = newClient.getObjectStore(); - OzoneBucket newBucket = - newStore.getVolume(volumeName).getBucket(bucketName); - try { - OzoneInputStream is = newBucket.readKey(keyName); - byte[] b = new byte[value.getBytes().length]; - is.read(b); - Assert.assertTrue(Arrays.equals(b, value.getBytes())); - } catch (OzoneChecksumException e) { - fail("Reading key should success"); - } - // read file with topology aware read disabled - - try { - OzoneInputStream is = newBucket.readFile(keyName); - byte[] b = new byte[value.getBytes().length]; - is.read(b); - Assert.assertTrue(Arrays.equals(b, value.getBytes())); - } catch (OzoneChecksumException e) { - fail("Reading file should success"); - } - } - @Test public void testPutKeyRatisOneNode() throws IOException, OzoneException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java index dc6e40744a..73a7de5147 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; @@ -63,6 +64,8 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract { conf = new OzoneConfiguration(); conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); + conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, + true); startCluster(conf); } @@ -91,11 +94,11 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract { String keyName = UUID.randomUUID().toString(); // Write data into a key - OzoneOutputStream out = bucket.createKey(keyName, + try (OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length, ReplicationType.RATIS, - THREE, new HashMap<>()); - out.write(value.getBytes()); - out.close(); + THREE, new HashMap<>())) { + out.write(value.getBytes()); + } // Since the rpc client is outside of cluster, then getFirstNode should be // equal to getClosestNode. @@ -103,48 +106,47 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract { builder.setVolumeName(volumeName).setBucketName(bucketName) .setKeyName(keyName).setRefreshPipeline(true); - // read key with topology aware read enabled(default) - try { - OzoneInputStream is = bucket.readKey(keyName); + // read key with topology aware read enabled + try (OzoneInputStream is = bucket.readKey(keyName)) { byte[] b = new byte[value.getBytes().length]; is.read(b); Assert.assertTrue(Arrays.equals(b, value.getBytes())); } catch (OzoneChecksumException e) { - fail("Reading key should success"); + fail("Read key should succeed"); } - // read file with topology aware read enabled(default) - try { - OzoneInputStream is = bucket.readFile(keyName); + + // read file with topology aware read enabled + try (OzoneInputStream is = bucket.readKey(keyName)) { byte[] b = new byte[value.getBytes().length]; is.read(b); Assert.assertTrue(Arrays.equals(b, value.getBytes())); } catch (OzoneChecksumException e) { - fail("Reading file should success"); + fail("Read file should succeed"); } // read key with topology aware read disabled - conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false"); - OzoneClient newClient = OzoneClientFactory.getRpcClient(conf); - ObjectStore newStore = newClient.getObjectStore(); - OzoneBucket newBucket = - newStore.getVolume(volumeName).getBucket(bucketName); - try { - OzoneInputStream is = newBucket.readKey(keyName); - byte[] b = new byte[value.getBytes().length]; - is.read(b); - Assert.assertTrue(Arrays.equals(b, value.getBytes())); - } catch (OzoneChecksumException e) { - fail("Reading key should success"); - } - // read file with topology aware read disabled + conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, + false); + try (OzoneClient newClient = OzoneClientFactory.getRpcClient(conf)) { + ObjectStore newStore = newClient.getObjectStore(); + OzoneBucket newBucket = + newStore.getVolume(volumeName).getBucket(bucketName); + try (OzoneInputStream is = newBucket.readKey(keyName)) { + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Read key should succeed"); + } - try { - OzoneInputStream is = newBucket.readFile(keyName); - byte[] b = new byte[value.getBytes().length]; - is.read(b); - Assert.assertTrue(Arrays.equals(b, value.getBytes())); - } catch (OzoneChecksumException e) { - fail("Reading file should success"); + // read file with topology aware read disabled + try (OzoneInputStream is = newBucket.readFile(keyName)) { + byte[] b = new byte[value.getBytes().length]; + is.read(b); + Assert.assertTrue(Arrays.equals(b, value.getBytes())); + } catch (OzoneChecksumException e) { + fail("Read file should succeed"); + } } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 557904a02f..3685bc7796 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -694,7 +694,9 @@ public class KeyManagerImpl implements KeyManager { }); } } - sortDatanodeInPipeline(value, clientAddress); + if (args.getSortDatanodes()) { + sortDatanodeInPipeline(value, clientAddress); + } return value; } catch (IOException ex) { LOG.debug("Get key failed for volume:{} bucket:{} key:{}", @@ -1916,7 +1918,9 @@ public class KeyManagerImpl implements KeyManager { try { OzoneFileStatus fileStatus = getFileStatus(args); if (fileStatus.isFile()) { - sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress); + if (args.getSortDatanodes()) { + sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress); + } return fileStatus.getKeyInfo(); } //if key is not of type file or if key is not found we throw an exception diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 53ab6fc703..7c9f12643f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -606,6 +606,7 @@ public class OzoneManagerRequestHandler implements RequestHandler { .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) .setRefreshPipeline(true) + .setSortDatanodesInPipeline(keyArgs.getSortDatanodes()) .build(); OmKeyInfo keyInfo = impl.lookupKey(omKeyArgs); resp.setKeyInfo(keyInfo.getProtobuf()); @@ -1053,6 +1054,7 @@ public class OzoneManagerRequestHandler implements RequestHandler { .setVolumeName(keyArgs.getVolumeName()) .setBucketName(keyArgs.getBucketName()) .setKeyName(keyArgs.getKeyName()) + .setSortDatanodesInPipeline(keyArgs.getSortDatanodes()) .build(); return LookupFileResponse.newBuilder() .setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf())