HDDS-227. Use Grpc as the default transport protocol for Standalone pipeline. Contributed by chencan.

This commit is contained in:
Márton Elek 2018-08-27 16:07:55 +02:00
parent 744ce200d2
commit a813fd0215
7 changed files with 13 additions and 109 deletions

View File

@ -60,7 +60,6 @@ public class XceiverClientManager implements Closeable {
private final Configuration conf; private final Configuration conf;
private final Cache<Long, XceiverClientSpi> clientCache; private final Cache<Long, XceiverClientSpi> clientCache;
private final boolean useRatis; private final boolean useRatis;
private final boolean useGrpc;
private static XceiverClientMetrics metrics; private static XceiverClientMetrics metrics;
/** /**
@ -78,8 +77,6 @@ public XceiverClientManager(Configuration conf) {
this.useRatis = conf.getBoolean( this.useRatis = conf.getBoolean(
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.useGrpc = conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
this.conf = conf; this.conf = conf;
this.clientCache = CacheBuilder.newBuilder() this.clientCache = CacheBuilder.newBuilder()
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
@ -153,8 +150,7 @@ public XceiverClientSpi call() throws Exception {
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
break; break;
case STAND_ALONE: case STAND_ALONE:
client = useGrpc ? new XceiverClientGrpc(pipeline, conf) : client = new XceiverClientGrpc(pipeline, conf);
new XceiverClient(pipeline, conf);
break; break;
case CHAINED: case CHAINED:
default: default:

View File

@ -49,10 +49,6 @@ public final class ScmConfigKeys {
= "dfs.container.ratis.enabled"; = "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
= false; = false;
public static final String DFS_CONTAINER_GRPC_ENABLED_KEY
= "dfs.container.grpc.enabled";
public static final boolean DFS_CONTAINER_GRPC_ENABLED_DEFAULT
= false;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
= "dfs.container.ratis.rpc.type"; = "dfs.container.ratis.rpc.type";
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT

View File

@ -94,15 +94,6 @@
the replication pipeline supported by ozone. the replication pipeline supported by ozone.
</description> </description>
</property> </property>
<property>
<name>dfs.container.grpc.enabled</name>
<value>false</value>
<tag>OZONE, MANAGEMENT, PIPELINE, RATIS</tag>
<description>Ozone supports different kinds of replication pipelines
protocols. grpc is one of the replication pipeline protocol supported by
ozone.
</description>
</property>
<property> <property>
<name>dfs.container.ratis.ipc</name> <name>dfs.container.ratis.ipc</name>
<value>9858</value> <value>9858</value>

View File

@ -25,12 +25,10 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
@ -76,17 +74,12 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
this.config = conf; this.config = conf;
this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
this.containerSet = new ContainerSet(); this.containerSet = new ContainerSet();
boolean useGrpc = this.config.getBoolean(
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
buildContainerSet(); buildContainerSet();
hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
context); context);
server = new XceiverServerSpi[]{ server = new XceiverServerSpi[]{
useGrpc ? new XceiverServerGrpc(datanodeDetails, this.config, this new XceiverServerGrpc(datanodeDetails, this.config, this
.hddsDispatcher) : .hddsDispatcher),
new XceiverServer(datanodeDetails,
this.config, this.hddsDispatcher),
XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
.config, hddsDispatcher) .config, hddsDispatcher)
}; };

View File

@ -26,10 +26,9 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.XceiverClient; import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -77,7 +76,7 @@ public void testCreateOzoneContainer() throws Exception {
container.getDispatcher().setScmId(UUID.randomUUID().toString()); container.getDispatcher().setScmId(UUID.randomUUID().toString());
container.start(); container.start();
XceiverClient client = new XceiverClient(pipeline, conf); XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
client.connect(); client.connect();
createContainerForTesting(client, containerID); createContainerForTesting(client, containerID);
} finally { } finally {
@ -119,7 +118,7 @@ public void testOzoneContainerViaDataNode() throws Exception {
cluster.waitForClusterToBeReady(); cluster.waitForClusterToBeReady();
// This client talks to ozone container via datanode. // This client talks to ozone container via datanode.
XceiverClient client = new XceiverClient(pipeline, conf); XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
runTestOzoneContainerViaDataNode(containerID, client); runTestOzoneContainerViaDataNode(containerID, client);
} finally { } finally {
@ -215,7 +214,7 @@ static void runTestOzoneContainerViaDataNode(
@Test @Test
public void testBothGetandPutSmallFile() throws Exception { public void testBothGetandPutSmallFile() throws Exception {
MiniOzoneCluster cluster = null; MiniOzoneCluster cluster = null;
XceiverClient client = null; XceiverClientGrpc client = null;
try { try {
OzoneConfiguration conf = newOzoneConfiguration(); OzoneConfiguration conf = newOzoneConfiguration();
@ -269,7 +268,7 @@ static void runTestBothGetandPutSmallFile(
@Test @Test
public void testCloseContainer() throws Exception { public void testCloseContainer() throws Exception {
MiniOzoneCluster cluster = null; MiniOzoneCluster cluster = null;
XceiverClient client = null; XceiverClientGrpc client = null;
ContainerProtos.ContainerCommandResponseProto response; ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto ContainerProtos.ContainerCommandRequestProto
writeChunkRequest, putKeyRequest, request; writeChunkRequest, putKeyRequest, request;
@ -365,7 +364,7 @@ public void testCloseContainer() throws Exception {
@Test @Test
public void testDeleteContainer() throws Exception { public void testDeleteContainer() throws Exception {
MiniOzoneCluster cluster = null; MiniOzoneCluster cluster = null;
XceiverClient client = null; XceiverClientGrpc client = null;
ContainerProtos.ContainerCommandResponseProto response; ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto request, ContainerProtos.ContainerCommandRequestProto request,
writeChunkRequest, putKeyRequest; writeChunkRequest, putKeyRequest;
@ -485,7 +484,7 @@ static void runAsyncTests(
@Test @Test
public void testXcieverClientAsync() throws Exception { public void testXcieverClientAsync() throws Exception {
MiniOzoneCluster cluster = null; MiniOzoneCluster cluster = null;
XceiverClient client = null; XceiverClientGrpc client = null;
try { try {
OzoneConfiguration conf = newOzoneConfiguration(); OzoneConfiguration conf = newOzoneConfiguration();
@ -503,38 +502,8 @@ public void testXcieverClientAsync() throws Exception {
} }
} }
@Test private static XceiverClientGrpc createClientForTesting(
public void testInvalidRequest() throws Exception { OzoneConfiguration conf) throws Exception {
MiniOzoneCluster cluster = null;
XceiverClient client;
ContainerProtos.ContainerCommandRequestProto request;
try {
OzoneConfiguration conf = newOzoneConfiguration();
client = createClientForTesting(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
.setRandomContainerPort(false)
.build();
cluster.waitForClusterToBeReady();
client.connect();
// Send a request without traceId.
long containerID = ContainerTestHelper.getTestContainerID();
request = ContainerTestHelper
.getRequestWithoutTraceId(client.getPipeline(), containerID);
client.sendCommand(request);
Assert.fail("IllegalArgumentException expected");
} catch(IllegalArgumentException iae){
GenericTestUtils.assertExceptionContains("Invalid trace ID", iae);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static XceiverClient createClientForTesting(OzoneConfiguration conf)
throws Exception {
// Start ozone container Via Datanode create. // Start ozone container Via Datanode create.
Pipeline pipeline = Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(); ContainerTestHelper.createSingleNodePipeline();
@ -543,7 +512,7 @@ private static XceiverClient createClientForTesting(OzoneConfiguration conf)
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()); .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
// This client talks to ozone container via datanode. // This client talks to ozone container via datanode.
return new XceiverClient(pipeline, conf); return new XceiverClientGrpc(pipeline, conf);
} }
private static void createContainerForTesting(XceiverClientSpi client, private static void createContainerForTesting(XceiverClientSpi client,

View File

@ -19,7 +19,6 @@
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -36,12 +35,7 @@
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import static org.apache.hadoop.hdds.scm import static org.apache.hadoop.hdds.scm
.ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY; .ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
@ -49,23 +43,12 @@
/** /**
* Test for XceiverClientManager caching and eviction. * Test for XceiverClientManager caching and eviction.
*/ */
@RunWith(Parameterized.class)
public class TestXceiverClientManager { public class TestXceiverClientManager {
private static OzoneConfiguration config; private static OzoneConfiguration config;
private static MiniOzoneCluster cluster; private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient; storageContainerLocationClient;
private static String containerOwner = "OZONE"; private static String containerOwner = "OZONE";
private static boolean shouldUseGrpc;
@Parameterized.Parameters
public static Collection<Object[]> withGrpc() {
return Arrays.asList(new Object[][] {{false}, {true}});
}
public TestXceiverClientManager(boolean useGrpc) {
shouldUseGrpc = useGrpc;
}
@Rule @Rule
public ExpectedException exception = ExpectedException.none(); public ExpectedException exception = ExpectedException.none();
@ -73,8 +56,6 @@ public TestXceiverClientManager(boolean useGrpc) {
@Before @Before
public void init() throws Exception { public void init() throws Exception {
config = new OzoneConfiguration(); config = new OzoneConfiguration();
config.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
cluster = MiniOzoneCluster.newBuilder(config) cluster = MiniOzoneCluster.newBuilder(config)
.setNumDatanodes(3) .setNumDatanodes(3)
.build(); .build();
@ -94,8 +75,6 @@ public void shutdown() {
@Test @Test
public void testCaching() throws IOException { public void testCaching() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf); XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 = storageContainerLocationClient ContainerWithPipeline container1 = storageContainerLocationClient
@ -129,8 +108,6 @@ public void testCaching() throws IOException {
public void testFreeByReference() throws IOException { public void testFreeByReference() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf); XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache = Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();
@ -186,8 +163,6 @@ public void testFreeByReference() throws IOException {
public void testFreeByEviction() throws IOException { public void testFreeByEviction() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf); XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache = Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();

View File

@ -29,7 +29,6 @@
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -72,8 +71,6 @@
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -83,7 +80,6 @@
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collection;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -102,7 +98,6 @@
/** /**
* Test Ozone Key Lifecycle. * Test Ozone Key Lifecycle.
*/ */
@RunWith(Parameterized.class)
public class TestKeys { public class TestKeys {
/** /**
* Set the timeout for every test. * Set the timeout for every test.
@ -117,16 +112,7 @@ public class TestKeys {
private static long currentTime; private static long currentTime;
private static ReplicationFactor replicationFactor = ReplicationFactor.ONE; private static ReplicationFactor replicationFactor = ReplicationFactor.ONE;
private static ReplicationType replicationType = ReplicationType.STAND_ALONE; private static ReplicationType replicationType = ReplicationType.STAND_ALONE;
private static boolean shouldUseGrpc;
@Parameterized.Parameters
public static Collection<Object[]> withGrpc() {
return Arrays.asList(new Object[][] {{false}, {true}});
}
public TestKeys(boolean useGrpc) {
shouldUseGrpc = useGrpc;
}
/** /**
* Create a MiniDFSCluster for testing. * Create a MiniDFSCluster for testing.
@ -141,8 +127,6 @@ public void init() throws Exception {
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
1000, TimeUnit.MILLISECONDS); 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, TimeUnit.SECONDS); conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, TimeUnit.SECONDS);
conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName()); path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName());
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG); Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);