diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 4049b8061f..1fa95c2fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hdfs.web; +import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -34,8 +36,11 @@ import java.util.Map; import java.util.StringTokenizer; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -44,6 +49,7 @@ import org.apache.hadoop.fs.DelegationTokenRenewer; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; @@ -545,7 +551,7 @@ public T run() throws IOException { * Also implements two-step connects for other operations redirected to * a DN such as open and checksum */ - private HttpURLConnection connect(URL url) throws IOException { + protected HttpURLConnection connect(URL url) throws IOException { //redirect hostname and port String redirectHost = null; @@ -698,7 +704,7 @@ private void shouldRetry(final IOException ioe, final int retry */ abstract class AbstractFsPathRunner extends AbstractRunner { private final Path fspath; - private final Param[] parameters; + private Param[] parameters; AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath, Param... parameters) { @@ -714,6 +720,10 @@ abstract class AbstractFsPathRunner extends AbstractRunner { this.parameters = parameters; } + protected void updateURLParameters(Param... p) { + this.parameters = p; + } + @Override protected URL getUrl() throws IOException { if (excludeDatanodes.getValue() != null) { @@ -1235,15 +1245,10 @@ public boolean delete(Path f, boolean recursive) throws IOException { } @Override - public FSDataInputStream open(final Path f, final int buffersize + public FSDataInputStream open(final Path f, final int bufferSize ) throws IOException { statistics.incrementReadOps(1); - final HttpOpParam.Op op = GetOpParam.Op.OPEN; - // use a runner so the open can recover from an invalid token - FsPathConnectionRunner runner = - new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize)); - return new FSDataInputStream(new OffsetUrlInputStream( - new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null))); + return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize)); } @Override @@ -1524,4 +1529,346 @@ public String getCanonicalServiceName() { InetSocketAddress[] getResolvedNNAddr() { return nnAddrs; } + + @VisibleForTesting + public void setRetryPolicy(RetryPolicy rp) { + this.retryPolicy = rp; + } + + /** + * This class is used for opening, reading, and seeking files while using the + * WebHdfsFileSystem. This class will invoke the retry policy when performing + * any of these actions. + */ + @VisibleForTesting + public class WebHdfsInputStream extends FSInputStream { + private ReadRunner readRunner = null; + + WebHdfsInputStream(Path path, int buffersize) throws IOException { + // Only create the ReadRunner once. Each read's byte array and position + // will be updated within the ReadRunner object before every read. + readRunner = new ReadRunner(path, buffersize); + } + + @Override + public int read() throws IOException { + final byte[] b = new byte[1]; + return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff); + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + return readRunner.read(b, off, len); + } + + @Override + public void seek(long newPos) throws IOException { + readRunner.seek(newPos); + } + + @Override + public long getPos() throws IOException { + return readRunner.getPos(); + } + + protected int getBufferSize() throws IOException { + return readRunner.getBufferSize(); + } + + protected Path getPath() throws IOException { + return readRunner.getPath(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public void close() throws IOException { + readRunner.close(); + } + + public void setFileLength(long len) { + readRunner.setFileLength(len); + } + + public long getFileLength() { + return readRunner.getFileLength(); + } + + @VisibleForTesting + ReadRunner getReadRunner() { + return readRunner; + } + + @VisibleForTesting + void setReadRunner(ReadRunner rr) { + this.readRunner = rr; + } + } + + enum RunnerState { + DISCONNECTED, // Connection is closed programmatically by ReadRunner + OPEN, // Connection has been established by ReadRunner + SEEK, // Calling code has explicitly called seek() + CLOSED // Calling code has explicitly called close() + } + + /** + * This class will allow retries to occur for both open and read operations. + * The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object, + * which creates a new ReadRunner object that will be used to open a + * connection and read or seek into the input stream. + * + * ReadRunner is a subclass of the AbstractRunner class, which will run the + * ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse + * methods within a retry loop, based on the configured retry policy. + * ReadRunner#connect will create a connection if one has not already been + * created. Otherwise, it will return the previously created connection + * object. This is necessary because a new connection should not be created + * for every read. + * Likewise, ReadRunner#getUrl will construct a new URL object only if the + * connection has not previously been established. Otherwise, it will return + * the previously created URL object. + * ReadRunner#getResponse will initialize the input stream if it has not + * already been initialized and read the requested data from the specified + * input stream. + */ + @VisibleForTesting + protected class ReadRunner extends AbstractFsPathRunner { + private InputStream in = null; + private HttpURLConnection cachedConnection = null; + private byte[] readBuffer; + private int readOffset; + private int readLength; + private RunnerState runnerState = RunnerState.DISCONNECTED; + private URL originalUrl = null; + private URL resolvedUrl = null; + + private final Path path; + private final int bufferSize; + private long pos = 0; + private long fileLength = 0; + + /* The following methods are WebHdfsInputStream helpers. */ + + ReadRunner(Path p, int bs) throws IOException { + super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs)); + this.path = p; + this.bufferSize = bs; + } + + int read(byte[] b, int off, int len) throws IOException { + if (runnerState == RunnerState.CLOSED) { + throw new IOException("Stream closed"); + } + + // Before the first read, pos and fileLength will be 0 and readBuffer + // will all be null. They will be initialized once the first connection + // is made. Only after that it makes sense to compare pos and fileLength. + if (pos >= fileLength && readBuffer != null) { + return -1; + } + + // If a seek is occurring, the input stream will have been closed, so it + // needs to be reopened. Use the URLRunner to call AbstractRunner#connect + // with the previously-cached resolved URL and with the 'redirected' flag + // set to 'true'. The resolved URL contains the URL of the previously + // opened DN as opposed to the NN. It is preferable to use the resolved + // URL when creating a connection because it does not hit the NN or every + // seek, nor does it open a connection to a new DN after every seek. + // The redirect flag is needed so that AbstractRunner#connect knows the + // URL is already resolved. + // Note that when the redirected flag is set, retries are not attempted. + // So, if the connection fails using URLRunner, clear out the connection + // and fall through to establish the connection using ReadRunner. + if (runnerState == RunnerState.SEEK) { + try { + final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos)); + cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run(); + } catch (IOException ioe) { + closeInputStream(RunnerState.DISCONNECTED); + } + } + + readBuffer = b; + readOffset = off; + readLength = len; + + int count = -1; + count = this.run(); + if (count >= 0) { + statistics.incrementBytesRead(count); + pos += count; + } else if (pos < fileLength) { + throw new EOFException( + "Premature EOF: pos=" + pos + " < filelength=" + fileLength); + } + return count; + } + + void seek(long newPos) throws IOException { + if (pos != newPos) { + pos = newPos; + closeInputStream(RunnerState.SEEK); + } + } + + public void close() throws IOException { + closeInputStream(RunnerState.CLOSED); + } + + /* The following methods are overriding AbstractRunner methods, + * to be called within the retry policy context by runWithRetry. + */ + + @Override + protected URL getUrl() throws IOException { + // This method is called every time either a read is executed. + // The check for connection == null is to ensure that a new URL is only + // created upon a new connection and not for every read. + if (cachedConnection == null) { + // Update URL with current offset. BufferSize doesn't change, but it + // still must be included when creating the new URL. + updateURLParameters(new BufferSizeParam(bufferSize), + new OffsetParam(pos)); + originalUrl = super.getUrl(); + } + return originalUrl; + } + + /* Only make the connection if it is not already open. Don't cache the + * connection here. After this method is called, runWithRetry will call + * validateResponse, and then call the below ReadRunner#getResponse. If + * the code path makes it that far, then we can cache the connection. + */ + @Override + protected HttpURLConnection connect(URL url) throws IOException { + HttpURLConnection conn = cachedConnection; + if (conn == null) { + try { + conn = super.connect(url); + } catch (IOException e) { + closeInputStream(RunnerState.DISCONNECTED); + throw e; + } + } + return conn; + } + + /* + * This method is used to perform reads within the retry policy context. + * This code is relying on runWithRetry to always call the above connect + * method and the verifyResponse method prior to calling getResponse. + */ + @Override + Integer getResponse(final HttpURLConnection conn) + throws IOException { + try { + // In the "open-then-read" use case, runWithRetry will have executed + // ReadRunner#connect to make the connection and then executed + // validateResponse to validate the response code. Only then do we want + // to cache the connection. + // In the "read-after-seek" use case, the connection is made and the + // response is validated by the URLRunner. ReadRunner#read then caches + // the connection and the ReadRunner#connect will pass on the cached + // connection + // In either case, stream initialization is done here if necessary. + cachedConnection = conn; + if (in == null) { + in = initializeInputStream(conn); + } + + int count = in.read(readBuffer, readOffset, readLength); + if (count < 0 && pos < fileLength) { + throw new EOFException( + "Premature EOF: pos=" + pos + " < filelength=" + fileLength); + } + return Integer.valueOf(count); + } catch (IOException e) { + String redirectHost = resolvedUrl.getAuthority(); + if (excludeDatanodes.getValue() != null) { + excludeDatanodes = new ExcludeDatanodesParam(redirectHost + "," + + excludeDatanodes.getValue()); + } else { + excludeDatanodes = new ExcludeDatanodesParam(redirectHost); + } + + // If an exception occurs, close the input stream and null it out so + // that if the abstract runner decides to retry, it will reconnect. + closeInputStream(RunnerState.DISCONNECTED); + throw e; + } + } + + @VisibleForTesting + InputStream initializeInputStream(HttpURLConnection conn) + throws IOException { + // Cache the resolved URL so that it can be used in the event of + // a future seek operation. + resolvedUrl = removeOffsetParam(conn.getURL()); + final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH); + InputStream inStream = conn.getInputStream(); + if (LOG.isDebugEnabled()) { + LOG.debug("open file: " + conn.getURL()); + } + if (cl != null) { + long streamLength = Long.parseLong(cl); + fileLength = pos + streamLength; + // Java has a bug with >2GB request streams. It won't bounds check + // the reads so the transfer blocks until the server times out + inStream = new BoundedInputStream(inStream, streamLength); + } else { + fileLength = getHdfsFileStatus(path).getLen(); + } + // Wrapping in BufferedInputStream because it is more performant than + // BoundedInputStream by itself. + runnerState = RunnerState.OPEN; + return new BufferedInputStream(inStream, bufferSize); + } + + // Close both the InputStream and the connection. + @VisibleForTesting + void closeInputStream(RunnerState rs) throws IOException { + if (in != null) { + IOUtils.close(cachedConnection); + in = null; + } + cachedConnection = null; + runnerState = rs; + } + + /* Getters and Setters */ + + @VisibleForTesting + protected InputStream getInputStream() { + return in; + } + + @VisibleForTesting + protected void setInputStream(InputStream inStream) { + in = inStream; + } + + Path getPath() { + return path; + } + + int getBufferSize() { + return bufferSize; + } + + long getFileLength() { + return fileLength; + } + + void setFileLength(long len) { + fileLength = len; + } + + long getPos() { + return pos; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 699d5cc31d..d5f51ff2a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2555,6 +2555,9 @@ Release 2.7.3 - UNRELEASED IMPROVEMENTS + HDFS-7163. WebHdfsFileSystem should retry reads according to the configured + retry policy. (Eric Payne via kihwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 9e01235a9c..8da1ef0033 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.apache.hadoop.hdfs.web.ByteRangeInputStream; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.junit.Assert; @@ -186,7 +186,7 @@ static void verifySeek(FileSystem fs, Path srcPath, int fileLength) assertSeekAndRead(in, pos, fileLength); } - if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) { + if (!(in.getWrappedStream() instanceof WebHdfsInputStream)) { try { in.seek(-1); Assert.fail("Should be failed if seek to negative offset"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java index eb9053c3f1..c526484126 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSXAttrBaseTest.java @@ -1234,7 +1234,7 @@ private void verifyFileAccess(FileSystem userFs, boolean expectOpenFailure) throws Exception { // Test that a file with the xattr can or can't be opened. try { - userFs.open(filePath); + userFs.open(filePath).read(); assertFalse("open succeeded but expected it to fail", expectOpenFailure); } catch (AccessControlException e) { assertTrue("open failed but expected it to succeed", expectOpenFailure); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java index 37b30637ad..c79e0c2af3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java @@ -260,7 +260,7 @@ public void testAuditWebHdfsOpen() throws Exception { setupAuditLogs(); WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME); - webfs.open(file); + webfs.open(file).read(); verifyAuditLogsCheckPattern(true, 3, webOpenPattern); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java index 0f601915a7..e8a26b7a73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java @@ -21,13 +21,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.InetSocketAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -43,12 +47,14 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.TestDFSClientRetries; @@ -59,11 +65,16 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream; import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.Param; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -71,6 +82,15 @@ import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** Test WebHDFS */ public class TestWebHDFS { static final Log LOG = LogFactory.getLog(TestWebHDFS.class); @@ -791,4 +811,142 @@ public WebHdfsFileSystem run() throws IOException { } }); } + + @Test(timeout=90000) + public void testWebHdfsReadRetries() throws Exception { + // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + final Configuration conf = WebHdfsTestUtil.createConf(); + final Path dir = new Path("/testWebHdfsReadRetries"); + + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024*512); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + + final short numDatanodes = 1; + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes) + .build(); + try { + cluster.waitActive(); + final FileSystem fs = WebHdfsTestUtil + .getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); + + //create a file + final long length = 1L << 20; + final Path file1 = new Path(dir, "testFile"); + + DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L); + + //get file status and check that it was written properly. + final FileStatus s1 = fs.getFileStatus(file1); + assertEquals("Write failed for file " + file1, length, s1.getLen()); + + // Ensure file can be read through WebHdfsInputStream + FSDataInputStream in = fs.open(file1); + assertTrue("Input stream is not an instance of class WebHdfsInputStream", + in.getWrappedStream() instanceof WebHdfsInputStream); + int count = 0; + for(; in.read() != -1; count++); + assertEquals("Read failed for file " + file1, s1.getLen(), count); + assertEquals("Sghould not be able to read beyond end of file", + in.read(), -1); + in.close(); + try { + in.read(); + fail("Read after close should have failed"); + } catch(IOException ioe) { } + + WebHdfsFileSystem wfs = (WebHdfsFileSystem)fs; + // Read should not be retried if AccessControlException is encountered. + String msg = "ReadRetries: Test Access Control Exception"; + testReadRetryExceptionHelper(wfs, file1, + new AccessControlException(msg), msg, false, 1); + + // Retry policy should be invoked if IOExceptions are thrown. + msg = "ReadRetries: Test SocketTimeoutException"; + testReadRetryExceptionHelper(wfs, file1, + new SocketTimeoutException(msg), msg, true, 5); + msg = "ReadRetries: Test SocketException"; + testReadRetryExceptionHelper(wfs, file1, + new SocketException(msg), msg, true, 5); + msg = "ReadRetries: Test EOFException"; + testReadRetryExceptionHelper(wfs, file1, + new EOFException(msg), msg, true, 5); + msg = "ReadRetries: Test Generic IO Exception"; + testReadRetryExceptionHelper(wfs, file1, + new IOException(msg), msg, true, 5); + + // If InvalidToken exception occurs, WebHdfs only retries if the + // delegation token was replaced. Do that twice, then verify by checking + // the number of times it tried. + WebHdfsFileSystem spyfs = spy(wfs); + when(spyfs.replaceExpiredDelegationToken()).thenReturn(true, true, false); + msg = "ReadRetries: Test Invalid Token Exception"; + testReadRetryExceptionHelper(spyfs, file1, + new InvalidToken(msg), msg, false, 3); + } finally { + cluster.shutdown(); + } + } + + public boolean attemptedRetry; + private void testReadRetryExceptionHelper(WebHdfsFileSystem fs, Path fn, + final IOException ex, String msg, boolean shouldAttemptRetry, + int numTimesTried) + throws Exception { + // Ovverride WebHdfsInputStream#getInputStream so that it returns + // an input stream that throws the specified exception when read + // is called. + FSDataInputStream in = fs.open(fn); + in.read(); // Connection is made only when the first read() occurs. + final WebHdfsInputStream webIn = + (WebHdfsInputStream)(in.getWrappedStream()); + + final InputStream spyInputStream = + spy(webIn.getReadRunner().getInputStream()); + doThrow(ex).when(spyInputStream).read((byte[])any(), anyInt(), anyInt()); + final WebHdfsFileSystem.ReadRunner rr = spy(webIn.getReadRunner()); + doReturn(spyInputStream) + .when(rr).initializeInputStream((HttpURLConnection) any()); + rr.setInputStream(spyInputStream); + webIn.setReadRunner(rr); + + // Override filesystem's retry policy in order to verify that + // WebHdfsInputStream is calling shouldRetry for the appropriate + // exceptions. + final RetryAction retryAction = new RetryAction(RetryDecision.RETRY); + final RetryAction failAction = new RetryAction(RetryDecision.FAIL); + RetryPolicy rp = new RetryPolicy() { + @Override + public RetryAction shouldRetry(Exception e, int retries, int failovers, + boolean isIdempotentOrAtMostOnce) throws Exception { + attemptedRetry = true; + if (retries > 3) { + return failAction; + } else { + return retryAction; + } + } + }; + fs.setRetryPolicy(rp); + + // If the retry logic is exercised, attemptedRetry will be true. Some + // exceptions should exercise the retry logic and others should not. + // Either way, the value of attemptedRetry should match shouldAttemptRetry. + attemptedRetry = false; + try { + webIn.read(); + fail(msg + ": Read should have thrown exception."); + } catch (Exception e) { + assertTrue(e.getMessage().contains(msg)); + } + assertEquals(msg + ": Read should " + (shouldAttemptRetry ? "" : "not ") + + "have called shouldRetry. ", + attemptedRetry, shouldAttemptRetry); + + verify(rr, times(numTimesTried)).getResponse((HttpURLConnection) any()); + webIn.close(); + in.close(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java index 5d2001414e..a68b1ace94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java @@ -182,7 +182,7 @@ public void testOpenNonExistFile() throws IOException { final Path p = new Path("/test/testOpenNonExistFile"); //open it as a file, should get FileNotFoundException try { - fs.open(p); + fs.open(p).read(); fail("Expected FileNotFoundException was not thrown"); } catch(FileNotFoundException fnfe) { WebHdfsFileSystem.LOG.info("This is expected.", fnfe); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java index 5e8568c442..cb1efaeee0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java @@ -383,7 +383,9 @@ public WebHdfsFileSystem run() throws IOException { reset(fs); // verify an expired token is replaced with a new token - fs.open(p).close(); + InputStream is = fs.open(p); + is.read(); + is.close(); verify(fs, times(2)).getDelegationToken(); // first bad, then good verify(fs, times(1)).replaceExpiredDelegationToken(); verify(fs, times(1)).getDelegationToken(null); @@ -398,7 +400,7 @@ public WebHdfsFileSystem run() throws IOException { // verify with open because it's a little different in how it // opens connections fs.cancelDelegationToken(fs.getRenewToken()); - InputStream is = fs.open(p); + is = fs.open(p); is.read(); is.close(); verify(fs, times(2)).getDelegationToken(); // first bad, then good