HDFS-1330 and HADOOP-6889. Added additional unit tests. Contributed by John George.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1163463 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8c9c8c72cc
commit
1cd3791172
@ -1092,6 +1092,7 @@ Release 0.22.0 - Unreleased
|
|||||||
(jghoman)
|
(jghoman)
|
||||||
|
|
||||||
HDFS-1330. Make RPCs to DataNodes timeout. (hairong)
|
HDFS-1330. Make RPCs to DataNodes timeout. (hairong)
|
||||||
|
Added additional unit tests per HADOOP-6889. (John George via mattf)
|
||||||
|
|
||||||
HDFS-202. HDFS support of listLocatedStatus introduced in HADOOP-6870.
|
HDFS-202. HDFS support of listLocatedStatus introduced in HADOOP-6870.
|
||||||
HDFS piggyback block locations to each file status when listing a
|
HDFS piggyback block locations to each file status when listing a
|
||||||
|
@ -25,7 +25,12 @@
|
|||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
@ -44,6 +49,8 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
@ -52,6 +59,11 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.Client;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.mockito.internal.stubbing.answers.ThrowsException;
|
import org.mockito.internal.stubbing.answers.ThrowsException;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
@ -61,9 +73,51 @@
|
|||||||
* properly in case of errors.
|
* properly in case of errors.
|
||||||
*/
|
*/
|
||||||
public class TestDFSClientRetries extends TestCase {
|
public class TestDFSClientRetries extends TestCase {
|
||||||
|
private static final String ADDRESS = "0.0.0.0";
|
||||||
|
final static private int PING_INTERVAL = 1000;
|
||||||
|
final static private int MIN_SLEEP_TIME = 1000;
|
||||||
public static final Log LOG =
|
public static final Log LOG =
|
||||||
LogFactory.getLog(TestDFSClientRetries.class.getName());
|
LogFactory.getLog(TestDFSClientRetries.class.getName());
|
||||||
|
final static private Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
private static class TestServer extends Server {
|
||||||
|
private boolean sleep;
|
||||||
|
private Class<? extends Writable> responseClass;
|
||||||
|
|
||||||
|
public TestServer(int handlerCount, boolean sleep) throws IOException {
|
||||||
|
this(handlerCount, sleep, LongWritable.class, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestServer(int handlerCount, boolean sleep,
|
||||||
|
Class<? extends Writable> paramClass,
|
||||||
|
Class<? extends Writable> responseClass)
|
||||||
|
throws IOException {
|
||||||
|
super(ADDRESS, 0, paramClass, handlerCount, conf);
|
||||||
|
this.sleep = sleep;
|
||||||
|
this.responseClass = responseClass;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Writable call(Class<?> protocol, Writable param, long receiveTime)
|
||||||
|
throws IOException {
|
||||||
|
if (sleep) {
|
||||||
|
// sleep a bit
|
||||||
|
try {
|
||||||
|
Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
|
||||||
|
} catch (InterruptedException e) {}
|
||||||
|
}
|
||||||
|
if (responseClass != null) {
|
||||||
|
try {
|
||||||
|
return responseClass.newInstance();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return param; // echo param as result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// writes 'len' bytes of data to out.
|
// writes 'len' bytes of data to out.
|
||||||
private static void writeData(OutputStream out, int len) throws IOException {
|
private static void writeData(OutputStream out, int len) throws IOException {
|
||||||
byte [] buf = new byte[4096*16];
|
byte [] buf = new byte[4096*16];
|
||||||
@ -80,8 +134,6 @@ private static void writeData(OutputStream out, int len) throws IOException {
|
|||||||
*/
|
*/
|
||||||
public void testWriteTimeoutAtDataNode() throws IOException,
|
public void testWriteTimeoutAtDataNode() throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
|
|
||||||
final int writeTimeout = 100; //milliseconds.
|
final int writeTimeout = 100; //milliseconds.
|
||||||
// set a very short write timeout for datanode, so that tests runs fast.
|
// set a very short write timeout for datanode, so that tests runs fast.
|
||||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, writeTimeout);
|
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, writeTimeout);
|
||||||
@ -136,7 +188,6 @@ public void testNotYetReplicatedErrors() throws IOException
|
|||||||
{
|
{
|
||||||
final String exceptionMsg = "Nope, not replicated yet...";
|
final String exceptionMsg = "Nope, not replicated yet...";
|
||||||
final int maxRetries = 1; // Allow one retry (total of two calls)
|
final int maxRetries = 1; // Allow one retry (total of two calls)
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries);
|
conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, maxRetries);
|
||||||
|
|
||||||
NameNode mockNN = mock(NameNode.class);
|
NameNode mockNN = mock(NameNode.class);
|
||||||
@ -182,7 +233,6 @@ public void testFailuresArePerOperation() throws Exception
|
|||||||
long fileSize = 4096;
|
long fileSize = 4096;
|
||||||
Path file = new Path("/testFile");
|
Path file = new Path("/testFile");
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
// Set short retry timeout so this test runs faster
|
// Set short retry timeout so this test runs faster
|
||||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
|
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
@ -379,7 +429,6 @@ private boolean busyTest(int xcievers, int threads, int fileLen, int timeWin, in
|
|||||||
long blockSize = 128*1024*1024; // DFS block size
|
long blockSize = 128*1024*1024; // DFS block size
|
||||||
int bufferSize = 4096;
|
int bufferSize = 4096;
|
||||||
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, xcievers);
|
conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, xcievers);
|
||||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
|
conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
|
||||||
retries);
|
retries);
|
||||||
@ -540,7 +589,6 @@ public void testGetFileChecksum() throws Exception {
|
|||||||
final String f = "/testGetFileChecksum";
|
final String f = "/testGetFileChecksum";
|
||||||
final Path p = new Path(f);
|
final Path p = new Path(f);
|
||||||
|
|
||||||
final Configuration conf = new Configuration();
|
|
||||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
@ -566,5 +614,39 @@ public void testGetFileChecksum() throws Exception {
|
|||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Test that timeout occurs when DN does not respond to RPC.
|
||||||
|
* Start up a server and ask it to sleep for n seconds. Make an
|
||||||
|
* RPC to the server and set rpcTimeout to less than n and ensure
|
||||||
|
* that socketTimeoutException is obtained
|
||||||
|
*/
|
||||||
|
public void testClientDNProtocolTimeout() throws IOException {
|
||||||
|
final Server server = new TestServer(1, true);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
DatanodeID fakeDnId = new DatanodeID(
|
||||||
|
"localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
|
||||||
|
|
||||||
|
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
||||||
|
LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]);
|
||||||
|
|
||||||
|
ClientDatanodeProtocol proxy = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
proxy = DFSUtil.createClientDatanodeProtocolProxy(
|
||||||
|
fakeDnId, conf, 500, fakeBlock);
|
||||||
|
|
||||||
|
proxy.getReplicaVisibleLength(null);
|
||||||
|
fail ("Did not get expected exception: SocketTimeoutException");
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
LOG.info("Got the expected Exception: SocketTimeoutException");
|
||||||
|
} finally {
|
||||||
|
if (proxy != null) {
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
}
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,6 +22,20 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ipc.Client;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -38,6 +52,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
@ -48,6 +63,50 @@
|
|||||||
* This tests InterDataNodeProtocol for block handling.
|
* This tests InterDataNodeProtocol for block handling.
|
||||||
*/
|
*/
|
||||||
public class TestInterDatanodeProtocol {
|
public class TestInterDatanodeProtocol {
|
||||||
|
private static final String ADDRESS = "0.0.0.0";
|
||||||
|
final static private int PING_INTERVAL = 1000;
|
||||||
|
final static private int MIN_SLEEP_TIME = 1000;
|
||||||
|
private static Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
|
||||||
|
private static class TestServer extends Server {
|
||||||
|
private boolean sleep;
|
||||||
|
private Class<? extends Writable> responseClass;
|
||||||
|
|
||||||
|
public TestServer(int handlerCount, boolean sleep) throws IOException {
|
||||||
|
this(handlerCount, sleep, LongWritable.class, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestServer(int handlerCount, boolean sleep,
|
||||||
|
Class<? extends Writable> paramClass,
|
||||||
|
Class<? extends Writable> responseClass)
|
||||||
|
throws IOException {
|
||||||
|
super(ADDRESS, 0, paramClass, handlerCount, conf);
|
||||||
|
this.sleep = sleep;
|
||||||
|
this.responseClass = responseClass;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Writable call(Class<?> protocol, Writable param, long receiveTime)
|
||||||
|
throws IOException {
|
||||||
|
if (sleep) {
|
||||||
|
// sleep a bit
|
||||||
|
try {
|
||||||
|
Thread.sleep(PING_INTERVAL + MIN_SLEEP_TIME);
|
||||||
|
} catch (InterruptedException e) {}
|
||||||
|
}
|
||||||
|
if (responseClass != null) {
|
||||||
|
try {
|
||||||
|
return responseClass.newInstance();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return param; // echo param as result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
|
public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
|
||||||
Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
|
Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
|
||||||
Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
|
Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
|
||||||
@ -73,7 +132,6 @@ public static LocatedBlock getLastLocatedBlock(
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testBlockMetaDataInfo() throws Exception {
|
public void testBlockMetaDataInfo() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -222,7 +280,6 @@ public void testInitReplicaRecovery() throws IOException {
|
|||||||
* */
|
* */
|
||||||
@Test
|
@Test
|
||||||
public void testUpdateReplicaUnderRecovery() throws IOException {
|
public void testUpdateReplicaUnderRecovery() throws IOException {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -291,4 +348,33 @@ public void testUpdateReplicaUnderRecovery() throws IOException {
|
|||||||
if (cluster != null) cluster.shutdown();
|
if (cluster != null) cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Test to verify that InterDatanode RPC timesout as expected when
|
||||||
|
* the server DN does not respond.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testInterDNProtocolTimeout() throws Exception {
|
||||||
|
final Server server = new TestServer(1, true);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
DatanodeID fakeDnId = new DatanodeID(
|
||||||
|
"localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort());
|
||||||
|
DatanodeInfo dInfo = new DatanodeInfo(fakeDnId);
|
||||||
|
InterDatanodeProtocol proxy = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
proxy = DataNode.createInterDataNodeProtocolProxy(
|
||||||
|
dInfo, conf, 500);
|
||||||
|
proxy.initReplicaRecovery(null);
|
||||||
|
fail ("Expected SocketTimeoutException exception, but did not get.");
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
DataNode.LOG.info("Got expected Exception: SocketTimeoutException" + e);
|
||||||
|
} finally {
|
||||||
|
if (proxy != null) {
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
}
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user