HDDS-2107. Datanodes should retry forever to connect to SCM in an unsecure environment (#1424)
This commit is contained in:
parent
4222b62f2b
commit
66bd1681f8
@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
@ -38,6 +40,7 @@
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
@ -139,10 +142,14 @@ public void addSCMServer(InetSocketAddress address) throws IOException {
|
|||||||
long version =
|
long version =
|
||||||
RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
|
RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
|
||||||
|
|
||||||
StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy(
|
RetryPolicy retryPolicy =
|
||||||
|
RetryPolicies.retryForeverWithFixedSleep(
|
||||||
|
1000, TimeUnit.MILLISECONDS);
|
||||||
|
StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
|
||||||
StorageContainerDatanodeProtocolPB.class, version,
|
StorageContainerDatanodeProtocolPB.class, version,
|
||||||
address, UserGroupInformation.getCurrentUser(), conf,
|
address, UserGroupInformation.getCurrentUser(), conf,
|
||||||
NetUtils.getDefaultSocketFactory(conf), getRpcTimeout());
|
NetUtils.getDefaultSocketFactory(conf), getRpcTimeout(),
|
||||||
|
retryPolicy).getProxy();
|
||||||
|
|
||||||
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
|
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
|
||||||
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
|
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
|
||||||
|
@ -119,7 +119,7 @@ public void stop() {
|
|||||||
* transactions or not.
|
* transactions or not.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 300_000)
|
@Test(timeout = 500_000)
|
||||||
public void testDoubleBuffer() throws Exception {
|
public void testDoubleBuffer() throws Exception {
|
||||||
// This test checks whether count in tables are correct or not.
|
// This test checks whether count in tables are correct or not.
|
||||||
testDoubleBuffer(1, 10);
|
testDoubleBuffer(1, 10);
|
||||||
@ -397,7 +397,7 @@ public void testDoubleBuffer(int iterations, int bucketCount)
|
|||||||
}
|
}
|
||||||
return count == iterations;
|
return count == iterations;
|
||||||
|
|
||||||
}, 300, 40000);
|
}, 300, 300000);
|
||||||
|
|
||||||
|
|
||||||
GenericTestUtils.waitFor(() -> {
|
GenericTestUtils.waitFor(() -> {
|
||||||
@ -409,7 +409,7 @@ public void testDoubleBuffer(int iterations, int bucketCount)
|
|||||||
fail("testDoubleBuffer failed");
|
fail("testDoubleBuffer failed");
|
||||||
}
|
}
|
||||||
return count == bucketCount * iterations;
|
return count == bucketCount * iterations;
|
||||||
}, 300, 40000);
|
}, 300, 300000);
|
||||||
|
|
||||||
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
|
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
|
||||||
} finally {
|
} finally {
|
||||||
|
Loading…
Reference in New Issue
Block a user