HDDS-109. Add reconnect logic for XceiverClientGrpc.

Contributed by Lokesh Jain.
This commit is contained in:
Anu Engineer 2018-06-13 09:50:10 -07:00
parent 8e7548d33b
commit 43baa036ae
3 changed files with 48 additions and 10 deletions

View File

@ -55,6 +55,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private XceiverClientMetrics metrics; private XceiverClientMetrics metrics;
private ManagedChannel channel; private ManagedChannel channel;
private final Semaphore semaphore; private final Semaphore semaphore;
private boolean closed = false;
/** /**
* Constructs a client that can communicate with the Container framework on * Constructs a client that can communicate with the Container framework on
@ -105,6 +106,7 @@ public boolean isConnected() {
@Override @Override
public void close() { public void close() {
closed = true;
channel.shutdownNow(); channel.shutdownNow();
try { try {
channel.awaitTermination(60, TimeUnit.MINUTES); channel.awaitTermination(60, TimeUnit.MINUTES);
@ -153,6 +155,14 @@ public ContainerCommandResponseProto sendCommand(
public CompletableFuture<ContainerCommandResponseProto> public CompletableFuture<ContainerCommandResponseProto>
sendCommandAsync(ContainerCommandRequestProto request) sendCommandAsync(ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException { throws IOException, ExecutionException, InterruptedException {
if(closed){
throw new IOException("This channel is not connected.");
}
if(channel == null || !isConnected()) {
reconnect();
}
final CompletableFuture<ContainerCommandResponseProto> replyFuture = final CompletableFuture<ContainerCommandResponseProto> replyFuture =
new CompletableFuture<>(); new CompletableFuture<>();
semaphore.acquire(); semaphore.acquire();
@ -192,6 +202,19 @@ public void onCompleted() {
return replyFuture; return replyFuture;
} }
private void reconnect() throws IOException {
try {
connect();
} catch (Exception e) {
LOG.error("Error while connecting: ", e);
throw new IOException(e);
}
if (channel == null || !isConnected()) {
throw new IOException("This channel is not connected.");
}
}
/** /**
* Create a pipeline. * Create a pipeline.
* *

View File

@ -163,8 +163,7 @@ public void testFreeByReference() throws IOException {
// and any container operations should fail // and any container operations should fail
clientManager.releaseClient(client1); clientManager.releaseClient(client1);
String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" : String expectedMessage = "This channel is not connected.";
"This channel is not connected.";
try { try {
ContainerProtocolCalls.createContainer(client1, ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID1); container1.getContainerID(), traceID1);
@ -213,8 +212,7 @@ public void testFreeByEviction() throws IOException {
// Any container operation should now fail // Any container operation should now fail
String traceID2 = "trace" + RandomStringUtils.randomNumeric(4); String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" : String expectedMessage = "This channel is not connected.";
"This channel is not connected.";
try { try {
ContainerProtocolCalls.createContainer(client1, ContainerProtocolCalls.createContainer(client1,
container1.getContainerID(), traceID2); container1.getContainerID(), traceID2);

View File

@ -29,6 +29,7 @@
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -62,12 +63,14 @@
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.AfterClass; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -77,6 +80,7 @@
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collection;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -93,6 +97,7 @@
/** /**
* Test Ozone Key Lifecycle. * Test Ozone Key Lifecycle.
*/ */
@RunWith(Parameterized.class)
public class TestKeys { public class TestKeys {
/** /**
* Set the timeout for every test. * Set the timeout for every test.
@ -107,19 +112,31 @@ public class TestKeys {
private static long currentTime; private static long currentTime;
private static ReplicationFactor replicationFactor = ReplicationFactor.ONE; private static ReplicationFactor replicationFactor = ReplicationFactor.ONE;
private static ReplicationType replicationType = ReplicationType.STAND_ALONE; private static ReplicationType replicationType = ReplicationType.STAND_ALONE;
private static boolean shouldUseGrpc;
@Parameterized.Parameters
public static Collection<Object[]> withGrpc() {
return Arrays.asList(new Object[][] {{false}, {true}});
}
public TestKeys(boolean useGrpc) {
shouldUseGrpc = useGrpc;
}
/** /**
* Create a MiniDFSCluster for testing. * Create a MiniDFSCluster for testing.
* *
* @throws IOException * @throws IOException
*/ */
@BeforeClass @Before
public static void init() throws Exception { public void init() throws Exception {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
// Set short block deleting service interval to speed up deletions. // Set short block deleting service interval to speed up deletions.
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
1000, TimeUnit.MILLISECONDS); 1000, TimeUnit.MILLISECONDS);
conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
shouldUseGrpc);
path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName()); path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName());
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG); Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
@ -133,8 +150,8 @@ public static void init() throws Exception {
/** /**
* shutdown MiniDFSCluster. * shutdown MiniDFSCluster.
*/ */
@AfterClass @After
public static void shutdown() { public void shutdown() {
if (ozoneCluster != null) { if (ozoneCluster != null) {
ozoneCluster.shutdown(); ozoneCluster.shutdown();
} }