diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AuthenticationFilter.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AuthenticationFilter.java index f7305d0282..b37f39a50c 100644 --- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AuthenticationFilter.java +++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AuthenticationFilter.java @@ -331,7 +331,14 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha HttpServletResponse httpResponse = (HttpServletResponse) response; try { boolean newToken = false; - AuthenticationToken token = getToken(httpRequest); + AuthenticationToken token; + try { + token = getToken(httpRequest); + } + catch (AuthenticationException ex) { + LOG.warn("AuthenticationToken ignored: " + ex.getMessage()); + token = null; + } if (token == null) { if (LOG.isDebugEnabled()) { LOG.debug("Request [{}] triggering authentication", getRequestURL(httpRequest)); @@ -371,6 +378,9 @@ public Principal getUserPrincipal() { } filterChain.doFilter(httpRequest, httpResponse); } + else { + throw new AuthenticationException("Missing AuthenticationToken"); + } } catch (AuthenticationException ex) { if (!httpResponse.isCommitted()) { Cookie cookie = createCookie(""); diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java index 6395867071..9564026937 100644 --- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java +++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java @@ -23,10 +23,11 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.lang.reflect.Method; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -38,6 +39,8 @@ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Evolving public class KerberosName { + private static final Logger LOG = LoggerFactory.getLogger(KerberosName.class); + /** The first component of the name */ private final String serviceName; /** The second component of the name. It may be null. */ @@ -81,6 +84,7 @@ public class KerberosName { try { defaultRealm = KerberosUtil.getDefaultRealm(); } catch (Exception ke) { + LOG.warn("Kerberos krb5 configuration not found, setting default realm to empty"); defaultRealm=""; } } diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAuthenticationFilter.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAuthenticationFilter.java index 43687493b4..4f1bc111a7 100644 --- a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAuthenticationFilter.java +++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAuthenticationFilter.java @@ -349,7 +349,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } } - private void _testDoFilterAuthentication(boolean withDomainPath) throws Exception { + private void _testDoFilterAuthentication(boolean withDomainPath, boolean invalidToken) throws Exception { AuthenticationFilter filter = new AuthenticationFilter(); try { FilterConfig config = Mockito.mock(FilterConfig.class); @@ -380,6 +380,12 @@ private void _testDoFilterAuthentication(boolean withDomainPath) throws Exceptio Mockito.when(request.getRequestURL()).thenReturn(new StringBuffer("http://foo:8080/bar")); Mockito.when(request.getQueryString()).thenReturn("authenticated=true"); + if (invalidToken) { + Mockito.when(request.getCookies()).thenReturn( + new Cookie[] { new Cookie(AuthenticatedURL.AUTH_COOKIE, "foo")} + ); + } + HttpServletResponse response = Mockito.mock(HttpServletResponse.class); FilterChain chain = Mockito.mock(FilterChain.class); @@ -437,11 +443,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } public void testDoFilterAuthentication() throws Exception { - _testDoFilterAuthentication(false); + _testDoFilterAuthentication(false, false); + } + + public void testDoFilterAuthenticationWithInvalidToken() throws Exception { + _testDoFilterAuthentication(false, true); } public void testDoFilterAuthenticationWithDomainPath() throws Exception { - _testDoFilterAuthentication(true); + _testDoFilterAuthentication(true, false); } public void testDoFilterAuthenticated() throws Exception { diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index c51b413c3f..309f5d244e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -255,6 +255,9 @@ Release 2.0.0 - UNRELEASED HADOOP-8077. HA: fencing method should be able to be configured on a per-NN or per-NS basis (todd) + HADOOP-8086. KerberosName silently sets defaultRealm to "" if the + Kerberos config is not found, it should log a WARN (tucu) + OPTIMIZATIONS BUG FIXES @@ -329,6 +332,9 @@ Release 2.0.0 - UNRELEASED HADOOP-8251. Fix SecurityUtil.fetchServiceTicket after HADOOP-6941 (todd) + HADOOP-8249. invalid hadoop-auth cookies should trigger authentication + if info is avail before returning HTTP 401 (tucu) + BREAKDOWN OF HADOOP-7454 SUBTASKS HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh) @@ -405,6 +411,9 @@ Release 0.23.3 - UNRELEASED HADOOP-8180. Remove hsqldb since its not needed from pom.xml (Ravi Prakash via tgraves) + HADOOP-8014. ViewFileSystem does not correctly implement getDefaultBlockSize, + getDefaultReplication, getContentSummary (John George via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 61db1ed5e1..288a3033ed 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -579,7 +579,8 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, * * The FileSystem will simply return an elt containing 'localhost'. * - * @param p path of file to get locations for + * @param p path is used to identify an FS since an FS could have + * another FS that it could be delegating the call to * @param start offset into the given file * @param len length for which to get locations for */ @@ -602,10 +603,21 @@ public FsServerDefaults getServerDefaults() throws IOException { return new FsServerDefaults(getDefaultBlockSize(), conf.getInt("io.bytes.per.checksum", 512), 64 * 1024, - getDefaultReplication(), + getDefaultReplication(), conf.getInt("io.file.buffer.size", 4096)); } - + + /** + * Return a set of server default configuration values + * @param p path is used to identify an FS since an FS could have + * another FS that it could be delegating the call to + * @return server default configuration values + * @throws IOException + */ + public FsServerDefaults getServerDefaults(Path p) throws IOException { + return getServerDefaults(); + } + /** * Return the fully-qualified path of path f resolving the path * through any symlinks or mount point @@ -653,8 +665,8 @@ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { return create(f, overwrite, getConf().getInt("io.file.buffer.size", 4096), - getDefaultReplication(), - getDefaultBlockSize()); + getDefaultReplication(f), + getDefaultBlockSize(f)); } /** @@ -668,8 +680,8 @@ public FSDataOutputStream create(Path f, Progressable progress) throws IOException { return create(f, true, getConf().getInt("io.file.buffer.size", 4096), - getDefaultReplication(), - getDefaultBlockSize(), progress); + getDefaultReplication(f), + getDefaultBlockSize(f), progress); } /** @@ -683,7 +695,7 @@ public FSDataOutputStream create(Path f, short replication) return create(f, true, getConf().getInt("io.file.buffer.size", 4096), replication, - getDefaultBlockSize()); + getDefaultBlockSize(f)); } /** @@ -699,7 +711,7 @@ public FSDataOutputStream create(Path f, short replication, return create(f, true, getConf().getInt("io.file.buffer.size", 4096), replication, - getDefaultBlockSize(), progress); + getDefaultBlockSize(f), progress); } @@ -715,8 +727,8 @@ public FSDataOutputStream create(Path f, int bufferSize ) throws IOException { return create(f, overwrite, bufferSize, - getDefaultReplication(), - getDefaultBlockSize()); + getDefaultReplication(f), + getDefaultBlockSize(f)); } /** @@ -733,8 +745,8 @@ public FSDataOutputStream create(Path f, Progressable progress ) throws IOException { return create(f, overwrite, bufferSize, - getDefaultReplication(), - getDefaultBlockSize(), progress); + getDefaultReplication(f), + getDefaultBlockSize(f), progress); } @@ -1916,11 +1928,31 @@ public long getDefaultBlockSize() { return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024); } + /** Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. The given path will be used to + * locate the actual filesystem. The full path does not have to exist. + * @param f path of file + * @return the default block size for the path's filesystem + */ + public long getDefaultBlockSize(Path f) { + return getDefaultBlockSize(); + } + /** * Get the default replication. */ public short getDefaultReplication() { return 1; } + /** + * Get the default replication for a path. The given path will be used to + * locate the actual filesystem. The full path does not have to exist. + * @param path of the file + * @return default replication for the path's filesystem + */ + public short getDefaultReplication(Path path) { + return getDefaultReplication(); + } + /** * Return a file status object that represents the path. * @param f The path we want information from diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 91ee2ae710..1794c3d032 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -267,6 +268,7 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException { return fs.mkdirs(f, permission); } + /** * The src file is on the local disk. Add it to FS at * the given dst name. @@ -336,19 +338,42 @@ public long getUsed() throws IOException{ return fs.getUsed(); } - /** Return the number of bytes that large input files should be optimally - * be split into to minimize i/o time. */ + @Override public long getDefaultBlockSize() { return fs.getDefaultBlockSize(); } - /** - * Get the default replication. - */ + @Override public short getDefaultReplication() { return fs.getDefaultReplication(); } + @Override + public FsServerDefaults getServerDefaults() throws IOException { + return fs.getServerDefaults(); + } + + // path variants delegate to underlying filesystem + @Override + public ContentSummary getContentSummary(Path f) throws IOException { + return fs.getContentSummary(f); + } + + @Override + public long getDefaultBlockSize(Path f) { + return fs.getDefaultBlockSize(f); + } + + @Override + public short getDefaultReplication(Path f) { + return fs.getDefaultReplication(f); + } + + @Override + public FsServerDefaults getServerDefaults(Path f) throws IOException { + return fs.getServerDefaults(f); + } + /** * Get file status. */ @@ -441,4 +466,4 @@ public List> getDelegationTokens(String renewer, Credentials credentials) throws IOException { return fs.getDelegationTokens(renewer, credentials); } -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java index 18ec724b7a..209fd216d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ChRootedFileSystem.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; @@ -208,11 +209,6 @@ public FsStatus getStatus(Path p) throws IOException { return super.getStatus(fullPath(p)); } - @Override - public FsServerDefaults getServerDefaults() throws IOException { - return super.getServerDefaults(); - } - @Override public FileStatus[] listStatus(final Path f) throws IOException { @@ -273,4 +269,42 @@ public void setTimes(final Path f, final long mtime, final long atime) public Path resolvePath(final Path p) throws IOException { return super.resolvePath(fullPath(p)); } + + @Override + public ContentSummary getContentSummary(Path f) throws IOException { + return super.getContentSummary(fullPath(f)); + } + + + private static Path rootPath = new Path(Path.SEPARATOR); + + @Override + public long getDefaultBlockSize() { + return getDefaultBlockSize(fullPath(rootPath)); + } + + @Override + public long getDefaultBlockSize(Path f) { + return super.getDefaultBlockSize(fullPath(f)); + } + + @Override + public short getDefaultReplication() { + return getDefaultReplication(fullPath(rootPath)); + } + + @Override + public short getDefaultReplication(Path f) { + return super.getDefaultReplication(fullPath(f)); + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + return getServerDefaults(fullPath(rootPath)); + } + + @Override + public FsServerDefaults getServerDefaults(Path f) throws IOException { + return super.getServerDefaults(fullPath(f)); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NotInMountpointException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NotInMountpointException.java new file mode 100644 index 0000000000..f92108cfe7 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NotInMountpointException.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.viewfs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; + +import org.apache.hadoop.fs.Path; + +/** + * NotInMountpointException extends the UnsupportedOperationException. + * Exception class used in cases where the given path is not mounted + * through viewfs. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */ +@SuppressWarnings("serial") +public class NotInMountpointException extends UnsupportedOperationException { + final String msg; + + public NotInMountpointException(Path path, String operation) { + msg = operation + " on path `" + path + "' is not within a mount point"; + } + + public NotInMountpointException(String operation) { + msg = operation + " on empty path is invalid"; + } + + @Override + public String getMessage() { + return msg; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java index b3d19f7734..c2bdaaad9d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java @@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -41,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; @@ -470,6 +472,57 @@ public void setVerifyChecksum(final boolean verifyChecksum) { } } + @Override + public long getDefaultBlockSize() { + throw new NotInMountpointException("getDefaultBlockSize"); + } + + @Override + public short getDefaultReplication() { + throw new NotInMountpointException("getDefaultReplication"); + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + throw new NotInMountpointException("getServerDefaults"); + } + + @Override + public long getDefaultBlockSize(Path f) { + try { + InodeTree.ResolveResult res = + fsState.resolve(getUriPath(f), true); + return res.targetFileSystem.getDefaultBlockSize(res.remainingPath); + } catch (FileNotFoundException e) { + throw new NotInMountpointException(f, "getDefaultBlockSize"); + } + } + + @Override + public short getDefaultReplication(Path f) { + try { + InodeTree.ResolveResult res = + fsState.resolve(getUriPath(f), true); + return res.targetFileSystem.getDefaultReplication(res.remainingPath); + } catch (FileNotFoundException e) { + throw new NotInMountpointException(f, "getDefaultReplication"); + } + } + + @Override + public FsServerDefaults getServerDefaults(Path f) throws IOException { + InodeTree.ResolveResult res = + fsState.resolve(getUriPath(f), true); + return res.targetFileSystem.getServerDefaults(res.remainingPath); + } + + @Override + public ContentSummary getContentSummary(Path f) throws IOException { + InodeTree.ResolveResult res = + fsState.resolve(getUriPath(f), true); + return res.targetFileSystem.getContentSummary(res.remainingPath); + } + @Override public void setWriteChecksum(final boolean writeChecksum) { List> mountPoints = @@ -742,5 +795,20 @@ public void setTimes(Path f, long mtime, long atime) public void setVerifyChecksum(boolean verifyChecksum) { // Noop for viewfs } + + @Override + public FsServerDefaults getServerDefaults(Path f) throws IOException { + throw new NotInMountpointException(f, "getServerDefaults"); + } + + @Override + public long getDefaultBlockSize(Path f) { + throw new NotInMountpointException(f, "getDefaultBlockSize"); + } + + @Override + public short getDefaultReplication(Path f) { + throw new NotInMountpointException(f, "getDefaultReplication"); + } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java index 05fec95631..035a0165fb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java @@ -36,6 +36,7 @@ public final class FileSystemTestHelper { System.getProperty("test.build.data", "target/test/data") + "/test"; private static final int DEFAULT_BLOCK_SIZE = 1024; private static final int DEFAULT_NUM_BLOCKS = 2; + private static final short DEFAULT_NUM_REPL = 1; private static String absTestRootDir = null; /** Hidden constructor */ @@ -99,9 +100,9 @@ public static Path getDefaultWorkingDirectory(FileSystem fSys) * Create files with numBlocks blocks each with block size blockSize. */ public static long createFile(FileSystem fSys, Path path, int numBlocks, - int blockSize, boolean createParent) throws IOException { + int blockSize, short numRepl, boolean createParent) throws IOException { FSDataOutputStream out = - fSys.create(path, false, 4096, fSys.getDefaultReplication(), blockSize ); + fSys.create(path, false, 4096, numRepl, blockSize ); byte[] data = getFileData(numBlocks, blockSize); out.write(data, 0, data.length); @@ -109,13 +110,19 @@ public static long createFile(FileSystem fSys, Path path, int numBlocks, return data.length; } + + public static long createFile(FileSystem fSys, Path path, int numBlocks, + int blockSize, boolean createParent) throws IOException { + return createFile(fSys, path, numBlocks, blockSize, fSys.getDefaultReplication(), true); + } + public static long createFile(FileSystem fSys, Path path, int numBlocks, int blockSize) throws IOException { return createFile(fSys, path, numBlocks, blockSize, true); - } + } public static long createFile(FileSystem fSys, Path path) throws IOException { - return createFile(fSys, path, DEFAULT_NUM_BLOCKS, DEFAULT_BLOCK_SIZE, true); + return createFile(fSys, path, DEFAULT_NUM_BLOCKS, DEFAULT_BLOCK_SIZE, DEFAULT_NUM_REPL, true); } public static long createFile(FileSystem fSys, String name) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestChRootedFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestChRootedFileSystem.java index c46ab96f37..127866be1b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestChRootedFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestChRootedFileSystem.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FsConstants; @@ -170,7 +171,15 @@ public void testRename() throws IOException { Assert.assertTrue(fSys.isDirectory(FileSystemTestHelper.getTestRootPath(fSys,"/newDir/dirFooBar"))); Assert.assertTrue(fSysTarget.isDirectory(new Path(chrootedTo,"newDir/dirFooBar"))); } - + + @Test + public void testGetContentSummary() throws IOException { + // GetContentSummary of a dir + fSys.mkdirs(new Path("/newDir/dirFoo")); + ContentSummary cs = fSys.getContentSummary(new Path("/newDir/dirFoo")); + Assert.assertEquals(-1L, cs.getQuota()); + Assert.assertEquals(-1L, cs.getSpaceQuota()); + } /** * We would have liked renames across file system to fail but diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 108f4b5431..115a855139 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -62,14 +62,14 @@ Trunk (unreleased changes) HDFS-3178. Add states and state handler for journal synchronization in JournalService. (szetszwo) - HDFS-3204. Minor modification to JournalProtocol.proto to make - it generic. (suresh) - OPTIMIZATIONS HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. (Henry Robinson via todd) + HDFS-3110. Use directRead API to reduce the number of buffer copies in + libhdfs (Henry Robinson via todd) + BUG FIXES HDFS-2299. TestOfflineEditsViewer is failing on trunk. (Uma Maheswara Rao G @@ -114,6 +114,9 @@ Trunk (unreleased changes) HDFS-3126. Journal stream from Namenode to BackupNode needs to have timeout. (Hari Mankude via suresh) + + HDFS-3121. Add HDFS tests for HADOOP-8014 change. (John George via + suresh) Release 2.0.0 - UNRELEASED @@ -327,6 +330,17 @@ Release 2.0.0 - UNRELEASED HDFS-3050. rework OEV to share more code with the NameNode. (Colin Patrick McCabe via eli) + HDFS-3226. Allow GetConf tool to print arbitrary keys (todd) + + HDFS-3204. Minor modification to JournalProtocol.proto to make + it generic. (suresh) + + HDFS-2505. Add a test to verify getFileChecksum(..) with ViewFS. (Ravi + Prakash via szetszwo) + + HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo + and epoch in JournalProtocol. (suresh via szetszwo) + OPTIMIZATIONS HDFS-3024. Improve performance of stringification in addStoredBlock (todd) @@ -436,6 +450,9 @@ Release 2.0.0 - UNRELEASED HDFS-3208. Bogus entries in hosts files are incorrectly displayed in the report. (eli) + HDFS-3136. Remove SLF4J dependency as HDFS does not need it to fix + unnecessary warnings. (Jason Lowe via suresh) + BREAKDOWN OF HDFS-1623 SUBTASKS HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index a410cc0bad..7198f9fc16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -90,16 +90,6 @@ junit test - - org.slf4j - slf4j-api - compile - - - org.slf4j - slf4j-log4j12 - compile - org.mockito mockito-all diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java index 941a320a79..eabdd22a97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; /** @@ -33,6 +34,10 @@ public class UnregisteredNodeException extends IOException { private static final long serialVersionUID = -5620209396945970810L; + public UnregisteredNodeException(JournalInfo info) { + super("Unregistered server: " + info.toString()); + } + public UnregisteredNodeException(NodeRegistration nodeReg) { super("Unregistered server: " + nodeReg.toString()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java index 1858e70980..1805d14664 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java @@ -20,10 +20,13 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import com.google.protobuf.RpcController; @@ -48,9 +51,8 @@ public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) { public JournalResponseProto journal(RpcController unused, JournalRequestProto req) throws ServiceException { try { - impl.journal(PBHelper.convert(req.getJournalInfo()), - req.getFirstTxnId(), req.getNumTxns(), req.getRecords() - .toByteArray()); + impl.journal(PBHelper.convert(req.getJournalInfo()), req.getEpoch(), + req.getFirstTxnId(), req.getNumTxns(), req.getRecords().toByteArray()); } catch (IOException e) { throw new ServiceException(e); } @@ -63,10 +65,24 @@ public StartLogSegmentResponseProto startLogSegment(RpcController controller, StartLogSegmentRequestProto req) throws ServiceException { try { impl.startLogSegment(PBHelper.convert(req.getJournalInfo()), - req.getTxid()); + req.getEpoch(), req.getTxid()); } catch (IOException e) { throw new ServiceException(e); } return StartLogSegmentResponseProto.newBuilder().build(); } + + @Override + public FenceResponseProto fence(RpcController controller, + FenceRequestProto req) throws ServiceException { + try { + FenceResponse resp = impl.fence(PBHelper.convert(req.getJournalInfo()), req.getEpoch(), + req.getFencerInfo()); + return FenceResponseProto.newBuilder().setInSync(resp.isInSync()) + .setLastTransactionId(resp.getLastTransactionId()) + .setPreviousEpoch(resp.getPreviousEpoch()).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java index 9258180e52..d14e4e22fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java @@ -22,10 +22,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; @@ -58,10 +61,11 @@ public void close() { } @Override - public void journal(NamenodeRegistration reg, long firstTxnId, + public void journal(JournalInfo journalInfo, long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() - .setJournalInfo(PBHelper.convertToJournalInfo(reg)) + .setJournalInfo(PBHelper.convert(journalInfo)) + .setEpoch(epoch) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelper.getByteString(records)) @@ -74,10 +78,11 @@ public void journal(NamenodeRegistration reg, long firstTxnId, } @Override - public void startLogSegment(NamenodeRegistration registration, long txid) + public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException { StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() - .setJournalInfo(PBHelper.convertToJournalInfo(registration)) + .setJournalInfo(PBHelper.convert(journalInfo)) + .setEpoch(epoch) .setTxid(txid) .build(); try { @@ -86,6 +91,20 @@ public void startLogSegment(NamenodeRegistration registration, long txid) throw ProtobufHelper.getRemoteException(e); } } + + @Override + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException { + FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch) + .setJournalInfo(PBHelper.convert(journalInfo)).build(); + try { + FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req); + return new FenceResponse(resp.getPreviousEpoch(), + resp.getLastTransactionId(), resp.getInSync()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } @Override public boolean isMethodSupported(String methodName) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index e084862e82..fc50606f4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -117,6 +118,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -127,7 +129,6 @@ import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; @@ -1347,25 +1348,19 @@ public static StorageReportProto convert(StorageReport r) { .setStorageID(r.getStorageID()).build(); } - public static NamenodeRegistration convert(JournalInfoProto info) { + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; - StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0); - - // Note that the role is always {@link NamenodeRole#NAMENODE} as this - // conversion happens for messages from Namenode to Journal receivers. - // Addresses in the registration are unused. - return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE); + return new JournalInfo(lv, info.getClusterID(), nsID); } /** * Method used for converting {@link JournalInfoProto} sent from Namenode * to Journal receivers to {@link NamenodeRegistration}. */ - public static JournalInfoProto convertToJournalInfo( - NamenodeRegistration reg) { - return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID()) - .setLayoutVersion(reg.getLayoutVersion()) - .setNamespaceID(reg.getNamespaceID()).build(); + public static JournalInfoProto convert(JournalInfo j) { + return JournalInfoProto.newBuilder().setClusterID(j.getClusterId()) + .setLayoutVersion(j.getLayoutVersion()) + .setNamespaceID(j.getNamespaceId()).build(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java index 71210c6140..4e25eea313 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java @@ -31,6 +31,9 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.FencedException; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -40,6 +43,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; /** @@ -66,6 +70,8 @@ public class JournalService implements JournalProtocol { private final NamenodeProtocol namenode; private final StateHandler stateHandler = new StateHandler(); private final RPC.Server rpcServer; + private long epoch = 0; + private String fencerInfo; enum State { /** The service is initialized and ready to start. */ @@ -115,7 +121,7 @@ synchronized void waitForRoll() { current = State.WAITING_FOR_ROLL; } - synchronized void startLogSegment() throws IOException { + synchronized void startLogSegment() { if (current == State.WAITING_FOR_ROLL) { current = State.SYNCING; } @@ -232,28 +238,42 @@ public void stop() { } @Override - public void journal(NamenodeRegistration registration, long firstTxnId, + public void journal(JournalInfo journalInfo, long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Received journal " + firstTxnId + " " + numTxns); } stateHandler.isJournalAllowed(); - verify(registration); + verify(epoch, journalInfo); listener.journal(this, firstTxnId, numTxns, records); } @Override - public void startLogSegment(NamenodeRegistration registration, long txid) + public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Received startLogSegment " + txid); } stateHandler.isStartLogSegmentAllowed(); - verify(registration); + verify(epoch, journalInfo); listener.rollLogs(this, txid); stateHandler.startLogSegment(); } + @Override + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException { + LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch); + verifyFence(epoch, fencerInfo); + verify(journalInfo); + long previousEpoch = epoch; + this.epoch = epoch; + this.fencerInfo = fencerInfo; + + // TODO:HDFS-3092 set lastTransId and inSync + return new FenceResponse(previousEpoch, 0, false); + } + /** Create an RPC server. */ private static RPC.Server createRpcServer(Configuration conf, InetSocketAddress address, JournalProtocol impl) throws IOException { @@ -267,15 +287,54 @@ private static RPC.Server createRpcServer(Configuration conf, address.getHostName(), address.getPort(), 1, false, conf, null); } - private void verify(NamenodeRegistration reg) throws IOException { - if (!registration.getRegistrationID().equals(reg.getRegistrationID())) { - LOG.warn("Invalid registrationID - expected: " - + registration.getRegistrationID() + " received: " - + reg.getRegistrationID()); - throw new UnregisteredNodeException(reg); + private void verifyEpoch(long e) throws FencedException { + if (epoch != e) { + String errorMsg = "Epoch " + e + " is not valid. " + + "Resource has already been fenced by " + fencerInfo + + " with epoch " + epoch; + LOG.warn(errorMsg); + throw new FencedException(errorMsg); } } + private void verifyFence(long e, String fencer) throws FencedException { + if (e <= epoch) { + String errorMsg = "Epoch " + e + " from fencer " + fencer + + " is not valid. " + "Resource has already been fenced by " + + fencerInfo + " with epoch " + epoch; + LOG.warn(errorMsg); + throw new FencedException(errorMsg); + } + } + + /** + * Verifies a journal request + */ + private void verify(JournalInfo journalInfo) throws IOException { + String errorMsg = null; + int expectedNamespaceID = registration.getNamespaceID(); + if (journalInfo.getNamespaceId() != expectedNamespaceID) { + errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID + + " actual " + journalInfo.getNamespaceId(); + LOG.warn(errorMsg); + throw new UnregisteredNodeException(journalInfo); + } + if (!journalInfo.getClusterId().equals(registration.getClusterID())) { + errorMsg = "Invalid clusterId in journal request - expected " + + journalInfo.getClusterId() + " actual " + registration.getClusterID(); + LOG.warn(errorMsg); + throw new UnregisteredNodeException(journalInfo); + } + } + + /** + * Verifies a journal request + */ + private void verify(long e, JournalInfo journalInfo) throws IOException { + verifyEpoch(e); + verify(journalInfo); + } + /** * Register this service with the active namenode. */ @@ -298,4 +357,9 @@ private void handshake() throws IOException { listener.verifyVersion(this, nsInfo); registration.setStorageInfo(nsInfo); } -} \ No newline at end of file + + @VisibleForTesting + long getEpoch() { + return epoch; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java index de75b76934..ebf4f480f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; /** @@ -26,19 +27,20 @@ * to a BackupNode. */ class BackupJournalManager implements JournalManager { - - private final NamenodeRegistration nnReg; private final NamenodeRegistration bnReg; + private final JournalInfo journalInfo; BackupJournalManager(NamenodeRegistration bnReg, NamenodeRegistration nnReg) { + journalInfo = new JournalInfo(nnReg.getLayoutVersion(), + nnReg.getClusterID(), nnReg.getNamespaceID()); this.bnReg = bnReg; - this.nnReg = nnReg; } @Override public EditLogOutputStream startLogSegment(long txId) throws IOException { - EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg); + EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, + journalInfo); stm.startLogSegment(txId); return stm; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index ee08793eaa..1f005b016f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -217,7 +219,8 @@ void stop(boolean reportError) { } /* @Override */// NameNode - public boolean setSafeMode(SafeModeAction action) throws IOException { + public boolean setSafeMode(@SuppressWarnings("unused") SafeModeAction action) + throws IOException { throw new UnsupportedActionException("setSafeMode"); } @@ -236,51 +239,56 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn) /** * Verifies a journal request - * @param nodeReg node registration - * @throws UnregisteredNodeException if the registration is invalid */ - void verifyJournalRequest(NamenodeRegistration reg) throws IOException { - verifyVersion(reg.getLayoutVersion()); + private void verifyJournalRequest(JournalInfo journalInfo) + throws IOException { + verifyVersion(journalInfo.getLayoutVersion()); String errorMsg = null; int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID(); - if (reg.getNamespaceID() != expectedNamespaceID) { + if (journalInfo.getNamespaceId() != expectedNamespaceID) { errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID - + " actual " + reg.getNamespaceID(); + + " actual " + journalInfo.getNamespaceId(); LOG.warn(errorMsg); - throw new UnregisteredNodeException(reg); + throw new UnregisteredNodeException(journalInfo); } - if (!reg.getClusterID().equals(namesystem.getClusterId())) { + if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) { errorMsg = "Invalid clusterId in journal request - expected " - + reg.getClusterID() + " actual " + namesystem.getClusterId(); + + journalInfo.getClusterId() + " actual " + namesystem.getClusterId(); LOG.warn(errorMsg); - throw new UnregisteredNodeException(reg); + throw new UnregisteredNodeException(journalInfo); } } - ///////////////////////////////////////////////////// // BackupNodeProtocol implementation for backup node. ///////////////////////////////////////////////////// @Override - public void startLogSegment(NamenodeRegistration registration, long txid) - throws IOException { + public void startLogSegment(JournalInfo journalInfo, long epoch, + long txid) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); - verifyJournalRequest(registration); + verifyJournalRequest(journalInfo); getBNImage().namenodeStartedLogSegment(txid); } @Override - public void journal(NamenodeRegistration nnReg, - long firstTxId, int numTxns, - byte[] records) throws IOException { + public void journal(JournalInfo journalInfo, long epoch, long firstTxId, + int numTxns, byte[] records) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); - verifyJournalRequest(nnReg); + verifyJournalRequest(journalInfo); getBNImage().journal(firstTxId, numTxns, records); } private BackupImage getBNImage() { return (BackupImage)nn.getFSImage(); } + + @Override + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException { + LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch); + throw new UnsupportedOperationException( + "BackupNode does not support fence"); + } } ////////////////////////////////////////////////////// diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java index bdb4c5e773..5a28f7c512 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.io.DataOutputBuffer; @@ -42,18 +43,18 @@ class EditLogBackupOutputStream extends EditLogOutputStream { static int DEFAULT_BUFFER_SIZE = 256; - private JournalProtocol backupNode; // RPC proxy to backup node - private NamenodeRegistration bnRegistration; // backup node registration - private NamenodeRegistration nnRegistration; // active node registration + private final JournalProtocol backupNode; // RPC proxy to backup node + private final NamenodeRegistration bnRegistration; // backup node registration + private final JournalInfo journalInfo; // active node registration + private final DataOutputBuffer out; // serialized output sent to backup node private EditsDoubleBuffer doubleBuf; - private DataOutputBuffer out; // serialized output sent to backup node EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node - NamenodeRegistration nnReg) // active name-node + JournalInfo journalInfo) // active name-node throws IOException { super(); this.bnRegistration = bnReg; - this.nnRegistration = nnReg; + this.journalInfo = journalInfo; InetSocketAddress bnAddress = NetUtils.createSocketAddr(bnRegistration.getAddress()); try { @@ -127,8 +128,7 @@ protected void flushAndSync() throws IOException { out.reset(); assert out.getLength() == 0 : "Output buffer is not empty"; - backupNode.journal(nnRegistration, - firstTxToFlush, numReadyTxns, data); + backupNode.journal(journalInfo, 0, firstTxToFlush, numReadyTxns, data); } } @@ -140,6 +140,6 @@ NamenodeRegistration getRegistration() { } void startLogSegment(long txId) throws IOException { - backupNode.startLogSegment(nnRegistration, txId); + backupNode.startLogSegment(journalInfo, 0, txId); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java new file mode 100644 index 0000000000..5bbd76dd88 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Response to a journal fence request. See {@link JournalProtocol#fence} + */ +@InterfaceAudience.Private +public class FenceResponse { + private final long previousEpoch; + private final long lastTransactionId; + private final boolean isInSync; + + public FenceResponse(long previousEpoch, long lastTransId, boolean inSync) { + this.previousEpoch = previousEpoch; + this.lastTransactionId = lastTransId; + this.isInSync = inSync; + } + + public boolean isInSync() { + return isInSync; + } + + public long getLastTransactionId() { + return lastTransactionId; + } + + public long getPreviousEpoch() { + return previousEpoch; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java new file mode 100644 index 0000000000..2f9f54bd7e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.io.IOException; + +/** + * If a previous user of a resource tries to use a shared resource, after + * fenced by another user, this exception is thrown. + */ +public class FencedException extends IOException { + private static final long serialVersionUID = 1L; + + public FencedException(String errorMsg) { + super(errorMsg); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java new file mode 100644 index 0000000000..530934d237 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Information that describes a journal + */ +@InterfaceAudience.Private +public class JournalInfo { + private final int layoutVersion; + private final String clusterId; + private final int namespaceId; + + public JournalInfo(int lv, String clusterId, int nsId) { + this.layoutVersion = lv; + this.clusterId = clusterId; + this.namespaceId = nsId; + } + + public int getLayoutVersion() { + return layoutVersion; + } + + public String getClusterId() { + return clusterId; + } + + public int getNamespaceId() { + return namespaceId; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java index b9d55151f8..be514b96ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java @@ -21,7 +21,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo; /** @@ -53,12 +52,15 @@ public interface JournalProtocol { * via {@code EditLogBackupOutputStream} in order to synchronize meta-data * changes with the backup namespace image. * - * @param registration active node registration + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer * @param firstTxnId the first transaction of this batch * @param numTxns number of transactions * @param records byte array containing serialized journal records + * @throws FencedException if the resource has been fenced */ - public void journal(NamenodeRegistration registration, + public void journal(JournalInfo journalInfo, + long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException; @@ -66,9 +68,24 @@ public void journal(NamenodeRegistration registration, /** * Notify the BackupNode that the NameNode has rolled its edit logs * and is now writing a new log segment. - * @param registration the registration of the active NameNode + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer * @param txid the first txid in the new log + * @throws FencedException if the resource has been fenced */ - public void startLogSegment(NamenodeRegistration registration, + public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException; + + /** + * Request to fence any other journal writers. + * Older writers with at previous epoch will be fenced and can no longer + * perform journal operations. + * + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer + * @param fencerInfo info about fencer for debugging purposes + * @throws FencedException if the resource has been fenced + */ + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java index e3a67edebc..2546873e1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java @@ -21,10 +21,12 @@ import java.io.PrintStream; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hdfs.DFSUtil; @@ -70,7 +72,8 @@ enum Command { EXCLUDE_FILE("-excludeFile", "gets the exclude file path that defines the datanodes " + "that need to decommissioned."), - NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses"); + NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses"), + CONFKEY("-confKey [key]", "gets a specific key from the configuration"); private static Map map; static { @@ -87,6 +90,8 @@ enum Command { new CommandHandler("DFSConfigKeys.DFS_HOSTS_EXCLUDE")); map.put(NNRPCADDRESSES.getName().toLowerCase(), new NNRpcAddressesCommandHandler()); + map.put(CONFKEY.getName().toLowerCase(), + new PrintConfKeyCommandHandler()); } private final String cmd; @@ -98,6 +103,10 @@ enum Command { } public String getName() { + return cmd.split(" ")[0]; + } + + public String getUsage() { return cmd; } @@ -105,8 +114,8 @@ public String getDescription() { return description; } - public static CommandHandler getHandler(String name) { - return map.get(name.toLowerCase()); + public static CommandHandler getHandler(String cmd) { + return map.get(cmd.toLowerCase()); } } @@ -118,7 +127,7 @@ public static CommandHandler getHandler(String name) { StringBuilder usage = new StringBuilder(DESCRIPTION); usage.append("\nhadoop getconf \n"); for (Command cmd : Command.values()) { - usage.append("\t[" + cmd.getName() + "]\t\t\t" + cmd.getDescription() + usage.append("\t[" + cmd.getUsage() + "]\t\t\t" + cmd.getDescription() + "\n"); } USAGE = usage.toString(); @@ -128,7 +137,7 @@ public static CommandHandler getHandler(String name) { * Handler to return value for key corresponding to the {@link Command} */ static class CommandHandler { - final String key; // Configuration key to lookup + String key; // Configuration key to lookup CommandHandler() { this(null); @@ -138,18 +147,30 @@ static class CommandHandler { this.key = key; } - final int doWork(GetConf tool) { + final int doWork(GetConf tool, String[] args) { try { - return doWorkInternal(tool); + checkArgs(args); + + return doWorkInternal(tool, args); } catch (Exception e) { tool.printError(e.getMessage()); } return -1; } + + protected void checkArgs(String args[]) { + if (args.length > 0) { + throw new HadoopIllegalArgumentException( + "Did not expect argument: " + args[0]); + } + } + - /** Method to be overridden by sub classes for specific behavior */ - int doWorkInternal(GetConf tool) throws Exception { - String value = tool.getConf().get(key); + /** Method to be overridden by sub classes for specific behavior + * @param args */ + int doWorkInternal(GetConf tool, String[] args) throws Exception { + + String value = tool.getConf().getTrimmed(key); if (value != null) { tool.printOut(value); return 0; @@ -164,7 +185,7 @@ int doWorkInternal(GetConf tool) throws Exception { */ static class NameNodesCommandHandler extends CommandHandler { @Override - int doWorkInternal(GetConf tool) throws IOException { + int doWorkInternal(GetConf tool, String []args) throws IOException { tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf())); return 0; } @@ -175,7 +196,7 @@ int doWorkInternal(GetConf tool) throws IOException { */ static class BackupNodesCommandHandler extends CommandHandler { @Override - public int doWorkInternal(GetConf tool) throws IOException { + public int doWorkInternal(GetConf tool, String []args) throws IOException { tool.printMap(DFSUtil.getBackupNodeAddresses(tool.getConf())); return 0; } @@ -186,7 +207,7 @@ public int doWorkInternal(GetConf tool) throws IOException { */ static class SecondaryNameNodesCommandHandler extends CommandHandler { @Override - public int doWorkInternal(GetConf tool) throws IOException { + public int doWorkInternal(GetConf tool, String []args) throws IOException { tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf())); return 0; } @@ -199,7 +220,7 @@ public int doWorkInternal(GetConf tool) throws IOException { */ static class NNRpcAddressesCommandHandler extends CommandHandler { @Override - public int doWorkInternal(GetConf tool) throws IOException { + public int doWorkInternal(GetConf tool, String []args) throws IOException { Configuration config = tool.getConf(); List cnnlist = DFSUtil.flattenAddressMap( DFSUtil.getNNServiceRpcAddresses(config)); @@ -215,6 +236,23 @@ public int doWorkInternal(GetConf tool) throws IOException { } } + static class PrintConfKeyCommandHandler extends CommandHandler { + @Override + protected void checkArgs(String[] args) { + if (args.length != 1) { + throw new HadoopIllegalArgumentException( + "usage: " + Command.CONFKEY.getUsage()); + } + } + + @Override + int doWorkInternal(GetConf tool, String[] args) throws Exception { + this.key = args[0]; + System.err.println("key: " + key); + return super.doWorkInternal(tool, args); + } + } + private final PrintStream out; // Stream for printing command output private final PrintStream err; // Stream for printing error @@ -260,10 +298,11 @@ private void printUsage() { * @return return status of the command */ private int doWork(String[] args) { - if (args.length == 1) { + if (args.length >= 1) { CommandHandler handler = Command.getHandler(args[0]); if (handler != null) { - return handler.doWork(this); + return handler.doWork(this, + Arrays.copyOfRange(args, 1, args.length)); } } printUsage(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c index 7371caf090..946b31252a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.c @@ -123,6 +123,11 @@ static int errnoFromException(jthrowable exc, JNIEnv *env, goto done; } + if (!strcmp(excClass, "java.lang.UnsupportedOperationException")) { + errnum = ENOTSUP; + goto done; + } + if (!strcmp(excClass, "org.apache.hadoop.security." "AccessControlException")) { errnum = EACCES; @@ -614,8 +619,29 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, } else { file->file = (*env)->NewGlobalRef(env, jVal.l); file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT); + file->flags = 0; destroyLocalReference(env, jVal.l); + + if ((flags & O_WRONLY) == 0) { + // Try a test read to see if we can do direct reads + errno = 0; + char buf; + if (readDirect(fs, file, &buf, 0) == 0) { + // Success - 0-byte read should return 0 + file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; + } else { + if (errno != ENOTSUP) { + // Unexpected error. Clear it, don't set the direct flag. + fprintf(stderr, + "WARN: Unexpected error %d when testing " + "for direct read compatibility\n", errno); + errno = 0; + goto done; + } + } + errno = 0; + } } done: @@ -706,10 +732,57 @@ int hdfsExists(hdfsFS fs, const char *path) return jVal.z ? 0 : -1; } +// Checks input file for readiness for reading. +static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f, + jobject* jInputStream) +{ + *jInputStream = (jobject)(f ? f->file : NULL); + //Sanity check + if (!f || f->type == UNINITIALIZED) { + errno = EBADF; + return -1; + } + + //Error checking... make sure that this file is 'readable' + if (f->type != INPUT) { + fprintf(stderr, "Cannot read from a non-InputStream object!\n"); + errno = EINVAL; + return -1; + } + + return 0; +} + +// Common error-handling code between read paths. +static int handleReadResult(int success, jvalue jVal, jthrowable jExc, + JNIEnv* env) +{ + int noReadBytes; + if (success != 0) { + if ((*env)->ExceptionCheck(env)) { + errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." + "FSDataInputStream::read"); + } + noReadBytes = -1; + } else { + noReadBytes = jVal.i; + if (noReadBytes < 0) { + // -1 from Java is EOF, which is 0 here + noReadBytes = 0; + } + errno = 0; + } + + return noReadBytes; +} tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { + if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) { + return readDirect(fs, f, buffer, length); + } + // JAVA EQUIVALENT: // byte [] bR = new byte[length]; // fis.read(bR); @@ -722,46 +795,26 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) } //Parameters - jobject jInputStream = (jobject)(f ? f->file : NULL); + jobject jInputStream; + if (readPrepare(env, fs, f, &jInputStream) == -1) { + return -1; + } jbyteArray jbRarray; jint noReadBytes = 0; jvalue jVal; jthrowable jExc = NULL; - //Sanity check - if (!f || f->type == UNINITIALIZED) { - errno = EBADF; - return -1; - } - - //Error checking... make sure that this file is 'readable' - if (f->type != INPUT) { - fprintf(stderr, "Cannot read from a non-InputStream object!\n"); - errno = EINVAL; - return -1; - } - //Read the requisite bytes jbRarray = (*env)->NewByteArray(env, length); - if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, - "read", "([B)I", jbRarray) != 0) { - errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." - "FSDataInputStream::read"); - noReadBytes = -1; - } - else { - noReadBytes = jVal.i; - if (noReadBytes > 0) { - (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); - } else { - //This is a valid case: there aren't any bytes left to read! - if (noReadBytes == 0 || noReadBytes < -1) { - fprintf(stderr, "WARN: FSDataInputStream.read returned invalid return code - libhdfs returning EOF, i.e., 0: %d\n", noReadBytes); - } - noReadBytes = 0; - } - errno = 0; + + int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM, + "read", "([B)I", jbRarray); + + noReadBytes = handleReadResult(success, jVal, jExc, env);; + + if (noReadBytes > 0) { + (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer); } destroyLocalReference(env, jbRarray); @@ -769,6 +822,52 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) return noReadBytes; } +// Reads using the read(ByteBuffer) API, which does fewer copies +tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) +{ + // JAVA EQUIVALENT: + // ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer + // fis.read(bbuffer); + + //Get the JNIEnv* corresponding to current thread + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + + jobject jInputStream; + if (readPrepare(env, fs, f, &jInputStream) == -1) { + return -1; + } + + jint noReadBytes = 0; + jvalue jVal; + jthrowable jExc = NULL; + + //Read the requisite bytes + jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length); + if (bb == NULL) { + fprintf(stderr, "Could not allocate ByteBuffer"); + if ((*env)->ExceptionCheck(env)) { + errno = errnoFromException(NULL, env, "JNIEnv::NewDirectByteBuffer"); + } else { + errno = ENOMEM; // Best guess if there's no exception waiting + } + return -1; + } + + int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, + HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", + bb); + + noReadBytes = handleReadResult(success, jVal, jExc, env); + + destroyLocalReference(env, bb); + + return noReadBytes; +} + tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h index 0ee29d50ad..67bd288e1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs.h @@ -81,12 +81,16 @@ extern "C" { }; + // Bit fields for hdfsFile_internal flags + #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) + /** * The 'file-handle' to a file in hdfs. */ struct hdfsFile_internal { void* file; enum hdfsStreamType type; + uint32_t flags; }; typedef struct hdfsFile_internal* hdfsFile; @@ -203,7 +207,6 @@ extern "C" { */ tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); - /** * hdfsPread - Positional read of data from an open file. * @param fs The configured filesystem handle. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c index 2e6545de5a..21a4f8190e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/hdfs_test.c @@ -18,6 +18,8 @@ #include "hdfs.h" +tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); + void permission_disp(short permissions, char *rtr) { rtr[9] = '\0'; int i; @@ -51,7 +53,6 @@ void permission_disp(short permissions, char *rtr) { } int main(int argc, char **argv) { - hdfsFS fs = hdfsConnectNewInstance("default", 0); if(!fs) { fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); @@ -64,20 +65,25 @@ int main(int argc, char **argv) { exit(-1); } - const char* writePath = "/tmp/testfile.txt"; + const char* writePath = "/tmp/testfile.txt"; + const char* fileContents = "Hello, World!"; + { //Write tests - hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); if(!writeFile) { fprintf(stderr, "Failed to open %s for writing!\n", writePath); exit(-1); } fprintf(stderr, "Opened %s for writing successfully...\n", writePath); - - char* buffer = "Hello, World!"; - tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1); + tSize num_written_bytes = + hdfsWrite(fs, writeFile, (void*)fileContents, strlen(fileContents)+1); + if (num_written_bytes != strlen(fileContents) + 1) { + fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n", + (int)(strlen(fileContents) + 1), (int)num_written_bytes); + exit(-1); + } fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); tOffset currentPos = -1; @@ -138,18 +144,86 @@ int main(int argc, char **argv) { } fprintf(stderr, "Current position: %ld\n", currentPos); + if ((readFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) == 0) { + fprintf(stderr, "Direct read support incorrectly not detected " + "for HDFS filesystem\n"); + exit(-1); + } + + fprintf(stderr, "Direct read support detected for HDFS\n"); + + // Clear flags so that we really go through slow read path + readFile->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; + static char buffer[32]; tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, sizeof(buffer)); fprintf(stderr, "Read following %d bytes:\n%s\n", num_read_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, sizeof(buffer)); fprintf(stderr, "Read following %d bytes:\n%s\n", num_read_bytes, buffer); + if (hdfsSeek(fs, readFile, 0L)) { + fprintf(stderr, + "Failed to seek to file start for direct read test!\n"); + exit(-1); + } + + readFile->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; + + memset(buffer, 0, strlen(fileContents + 1)); + num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, + sizeof(buffer)); + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n", + fileContents, buffer, num_read_bytes); + exit(-1); + } + fprintf(stderr, "Read (direct) following %d bytes:\n%s\n", + num_read_bytes, buffer); hdfsCloseFile(fs, readFile); + + // Test correct behaviour for unsupported filesystems + hdfsFile localFile = hdfsOpenFile(lfs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); + if(!localFile) { + fprintf(stderr, "Failed to open %s for writing!\n", writePath); + exit(-1); + } + + tSize num_written_bytes = hdfsWrite(lfs, localFile, + (void*)fileContents, + strlen(fileContents) + 1); + + hdfsCloseFile(lfs, localFile); + localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0); + + if (localFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) { + fprintf(stderr, "Direct read support incorrectly detected for local " + "filesystem\n"); + exit(-1); + } + + memset(buffer, 0, strlen(fileContents + 1)); + int result = readDirect(lfs, localFile, (void*)buffer, sizeof(buffer)); + if (result != -1) { + fprintf(stderr, "Expected error from local direct read not seen!\n"); + exit(-1); + } + + if (errno != ENOTSUP) { + fprintf(stderr, "Error code not correctly set to ENOTSUP, was %d!\n", + errno); + exit(-1); + } + + fprintf(stderr, "Expected exception thrown for unsupported direct read\n"); + + hdfsCloseFile(lfs, localFile); } int totalResult = 0; @@ -446,4 +520,3 @@ int main(int argc, char **argv) { /** * vim: ts=4: sw=4: et: */ - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh index c2e4fa5693..51bb15f45d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/tests/test-libhdfs.sh @@ -17,126 +17,64 @@ # # -# Note: This script depends on 8 environment variables to function correctly: -# a) CLASSPATH -# b) HADOOP_PREFIX -# c) HADOOP_CONF_DIR -# d) HADOOP_LOG_DIR -# e) LIBHDFS_BUILD_DIR -# f) LIBHDFS_INSTALL_DIR -# g) OS_NAME -# h) CLOVER_JAR -# i} HADOOP_VERSION -# j) HADOOP_HDFS_HOME -# All these are passed by build.xml. +# Note: This script depends on 5 environment variables to function correctly: +# a) HADOOP_HOME - must be set +# b) HDFS_TEST_CONF_DIR - optional; the directory to read and write +# core-site.xml to. Defaults to /tmp +# c) LIBHDFS_BUILD_DIR - optional; the location of the hdfs_test +# executable. Defaults to the parent directory. +# d) OS_NAME - used to choose how to locate libjvm.so +# e) CLOVER_JAR - optional; the location of the Clover code coverage tool's jar. # +if [ "x$HADOOP_HOME" == "x" ]; then + echo "HADOOP_HOME is unset!" + exit 1 +fi + +if [ "x$LIBHDFS_BUILD_DIR" == "x" ]; then + LIBHDFS_BUILD_DIR=`pwd`/../ +fi + +if [ "x$HDFS_TEST_CONF_DIR" == "x" ]; then + HDFS_TEST_CONF_DIR=/tmp +fi + +# LIBHDFS_INSTALL_DIR is the directory containing libhdfs.so +LIBHDFS_INSTALL_DIR=$HADOOP_HOME/lib/native/ HDFS_TEST=hdfs_test -HADOOP_LIB_DIR=$HADOOP_PREFIX/lib -HADOOP_BIN_DIR=$HADOOP_PREFIX/bin -COMMON_BUILD_DIR=$HADOOP_PREFIX/build/ivy/lib/hadoop-hdfs/common -COMMON_JAR=$COMMON_BUILD_DIR/hadoop-common-$HADOOP_VERSION.jar +HDFS_TEST_JAR=`find $HADOOP_HOME/share/hadoop/hdfs/ \ +-name "hadoop-hdfs-*-tests.jar" | head -n 1` -cat > $HADOOP_CONF_DIR/core-site.xml < - - - - hadoop.tmp.dir - file:///$LIBHDFS_TEST_DIR - - - fs.default.name - hdfs://localhost:23000/ - - -EOF - -cat > $HADOOP_CONF_DIR/hdfs-site.xml < - - - - dfs.replication - 1 - - - dfs.support.append - true - - - dfs.namenode.logging.level - DEBUG - - -EOF - -cat > $HADOOP_CONF_DIR/slaves < /tmp/libhdfs-test-cluster.out 2>&1 & + +MINI_CLUSTER_PID=$! +for i in {1..15}; do + echo "Waiting for DFS cluster, attempt $i of 15" + [ -f $HDFS_TEST_CONF_DIR/core-site.xml ] && break; + sleep 2 +done + +if [ ! -f $HDFS_TEST_CONF_DIR/core-site.xml ]; then + echo "Cluster did not come up in 30s" + kill -9 $MINI_CLUSTER_PID + exit 1 fi -echo exiting with $BUILD_STATUS +echo "Cluster up, running tests" +# Disable error checking to make sure we get to cluster cleanup +set +e + +CLASSPATH=$CLASSPATH \ +LD_PRELOAD="$LIB_JVM_DIR/libjvm.so:$LIBHDFS_INSTALL_DIR/libhdfs.so:" \ +$LIBHDFS_BUILD_DIR/$HDFS_TEST + +BUILD_STATUS=$? + +echo "Tearing cluster down" +kill -9 $MINI_CLUSTER_PID +echo "Exiting with $BUILD_STATUS" exit $BUILD_STATUS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto index c15347190e..1e720bab05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto @@ -36,16 +36,18 @@ message JournalInfoProto { } /** - * JournalInfo - the information about the journal + * journalInfo - the information about the journal * firstTxnId - the first txid in the journal records * numTxns - Number of transactions in editlog * records - bytes containing serialized journal records + * epoch - change to this represents change of journal writer */ message JournalRequestProto { required JournalInfoProto journalInfo = 1; required uint64 firstTxnId = 2; required uint32 numTxns = 3; required bytes records = 4; + required uint64 epoch = 5; } /** @@ -55,12 +57,13 @@ message JournalResponseProto { } /** - * JournalInfo - the information about the journal + * journalInfo - the information about the journal * txid - first txid in the new log */ message StartLogSegmentRequestProto { - required JournalInfoProto journalInfo = 1; - required uint64 txid = 2; + required JournalInfoProto journalInfo = 1; // Info about the journal + required uint64 txid = 2; // Transaction ID + required uint64 epoch = 3; } /** @@ -69,6 +72,27 @@ message StartLogSegmentRequestProto { message StartLogSegmentResponseProto { } +/** + * journalInfo - the information about the journal + * txid - first txid in the new log + */ +message FenceRequestProto { + required JournalInfoProto journalInfo = 1; // Info about the journal + required uint64 epoch = 2; // Epoch - change indicates change in writer + optional string fencerInfo = 3; // Info about fencer for debugging +} + +/** + * previousEpoch - previous epoch if any or zero + * lastTransactionId - last valid transaction Id in the journal + * inSync - if all journal segments are available and in sync + */ +message FenceResponseProto { + optional uint64 previousEpoch = 1; + optional uint64 lastTransactionId = 2; + optional bool inSync = 3; +} + /** * Protocol used to journal edits to a remote node. Currently, * this is used to publish edits from the NameNode to a BackupNode. @@ -89,4 +113,10 @@ service JournalProtocolService { */ rpc startLogSegment(StartLogSegmentRequestProto) returns (StartLogSegmentResponseProto); + + /** + * Request to fence a journal receiver. + */ + rpc fence(FenceRequestProto) + returns (FenceResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsDefaultValue.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsDefaultValue.java new file mode 100644 index 0000000000..f3a81ac9c0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsDefaultValue.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.viewfs; + + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import javax.security.auth.login.LoginException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; + +/** + * Tests for viewfs implementation of default fs level values. + * This tests for both passing in a path (based on mount point) + * to obtain the default value of the fs that the path is mounted on + * or just passing in no arguments. + */ +public class TestViewFsDefaultValue { + + static final String testFileDir = "/tmp/test/"; + static final String testFileName = testFileDir + "testFileStatusSerialziation"; + private static MiniDFSCluster cluster; + private static Configuration CONF = new Configuration(); + private static FileSystem fHdfs; + private static FileSystem vfs; + private static Path testFilePath; + private static Path testFileDirPath; + + @BeforeClass + public static void clusterSetupAtBegining() throws IOException, + LoginException, URISyntaxException { + + CONF.setLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); + CONF.setInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT); + CONF.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + CONF.setInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT + 1); + CONF.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT); + + cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DFS_REPLICATION_DEFAULT + 1).build(); + cluster.waitClusterUp(); + fHdfs = cluster.getFileSystem(); + FileSystemTestHelper.createFile(fHdfs, testFileName); + Configuration conf = ViewFileSystemTestSetup.createConfig(); + ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri().toString() + + "/tmp")); + vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf); + testFileDirPath = new Path (testFileDir); + testFilePath = new Path (testFileName); + } + + + /** + * Test that default blocksize values can be retrieved on the client side. + */ + @Test + public void testGetDefaultBlockSize() + throws IOException, URISyntaxException { + // createFile does not use defaultBlockSize to create the file, + // but we are only looking at the defaultBlockSize, so this + // test should still pass + try { + vfs.getDefaultBlockSize(); + fail("getServerDefaults on viewFs did not throw excetion!"); + } catch (NotInMountpointException e) { + assertEquals(vfs.getDefaultBlockSize(testFilePath), + DFS_BLOCK_SIZE_DEFAULT); + } + } + + /** + * Test that default replication values can be retrieved on the client side. + */ + @Test + public void testGetDefaultReplication() + throws IOException, URISyntaxException { + try { + vfs.getDefaultReplication(); + fail("getDefaultReplication on viewFs did not throw excetion!"); + } catch (NotInMountpointException e) { + assertEquals(vfs.getDefaultReplication(testFilePath), + DFS_REPLICATION_DEFAULT+1); + } + } + + + /** + * Test that server default values can be retrieved on the client side. + */ + @Test + public void testServerDefaults() throws IOException { + try { + FsServerDefaults serverDefaults = vfs.getServerDefaults(); + fail("getServerDefaults on viewFs did not throw excetion!"); + } catch (NotInMountpointException e) { + FsServerDefaults serverDefaults = vfs.getServerDefaults(testFilePath); + assertEquals(DFS_BLOCK_SIZE_DEFAULT, serverDefaults.getBlockSize()); + assertEquals(DFS_BYTES_PER_CHECKSUM_DEFAULT, + serverDefaults.getBytesPerChecksum()); + assertEquals(DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT, + serverDefaults.getWritePacketSize()); + assertEquals(IO_FILE_BUFFER_SIZE_DEFAULT, + serverDefaults.getFileBufferSize()); + assertEquals(DFS_REPLICATION_DEFAULT + 1, + serverDefaults.getReplication()); + } + } + + /** + * Test that getContentSummary can be retrieved on the client side. + */ + @Test + public void testGetContentSummary() throws IOException { + FileSystem hFs = cluster.getFileSystem(0); + final DistributedFileSystem dfs = (DistributedFileSystem)hFs; + dfs.setQuota(testFileDirPath, 100, 500); + ContentSummary cs = vfs.getContentSummary(testFileDirPath); + assertEquals(100, cs.getQuota()); + assertEquals(500, cs.getSpaceQuota()); + } + + @AfterClass + public static void cleanup() throws IOException { + fHdfs.delete(new Path(testFileName), true); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java index 12a9ff378b..74c32d9c72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java @@ -30,6 +30,7 @@ import javax.security.auth.login.LoginException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; @@ -48,13 +49,13 @@ public class TestViewFsFileStatusHdfs { static final String testfilename = "/tmp/testFileStatusSerialziation"; + static final String someFile = "/hdfstmp/someFileForTestGetFileChecksum"; - - private static MiniDFSCluster cluster; private static Path defaultWorkingDirectory; private static Configuration CONF = new Configuration(); private static FileSystem fHdfs; + private static FileSystem vfs; @BeforeClass public static void clusterSetupAtBegining() throws IOException, @@ -65,18 +66,19 @@ public static void clusterSetupAtBegining() throws IOException, defaultWorkingDirectory = fHdfs.makeQualified( new Path("/user/" + UserGroupInformation.getCurrentUser().getShortUserName())); fHdfs.mkdirs(defaultWorkingDirectory); + + // Setup the ViewFS to be used for all tests. + Configuration conf = ViewFileSystemTestSetup.createConfig(); + ConfigUtil.addLink(conf, "/vfstmp", new URI(fHdfs.getUri() + "/hdfstmp")); + ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri() + "/tmp")); + vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf); + assertEquals(ViewFileSystem.class, vfs.getClass()); } @Test public void testFileStatusSerialziation() throws IOException, URISyntaxException { - long len = FileSystemTestHelper.createFile(fHdfs, testfilename); - - Configuration conf = ViewFileSystemTestSetup.createConfig(); - ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri().toString() + "/tmp")); - FileSystem vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf); - assertEquals(ViewFileSystem.class, vfs.getClass()); FileStatus stat = vfs.getFileStatus(new Path(testfilename)); assertEquals(len, stat.getLen()); // check serialization/deserialization @@ -89,9 +91,34 @@ public void testFileStatusSerialziation() assertEquals(len, deSer.getLen()); } + @Test + public void testGetFileChecksum() throws IOException, URISyntaxException { + // Create two different files in HDFS + FileSystemTestHelper.createFile(fHdfs, someFile); + FileSystemTestHelper.createFile(fHdfs, FileSystemTestHelper + .getTestRootPath(fHdfs, someFile + "other"), 1, 512); + // Get checksum through ViewFS + FileChecksum viewFSCheckSum = vfs.getFileChecksum( + new Path("/vfstmp/someFileForTestGetFileChecksum")); + // Get checksum through HDFS. + FileChecksum hdfsCheckSum = fHdfs.getFileChecksum( + new Path(someFile)); + // Get checksum of different file in HDFS + FileChecksum otherHdfsFileCheckSum = fHdfs.getFileChecksum( + new Path(someFile+"other")); + // Checksums of the same file (got through HDFS and ViewFS should be same) + assertEquals("HDFS and ViewFS checksums were not the same", viewFSCheckSum, + hdfsCheckSum); + // Checksum of different files should be different. + assertFalse("Some other HDFS file which should not have had the same " + + "checksum as viewFS did!", viewFSCheckSum.equals(otherHdfsFileCheckSum)); + } + @AfterClass public static void cleanup() throws IOException { fHdfs.delete(new Path(testfilename), true); + fHdfs.delete(new Path(someFile), true); + fHdfs.delete(new Path(someFile + "other"), true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java index 03c511f319..ab3ce9fee3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java @@ -20,12 +20,18 @@ import java.io.IOException; import java.net.InetSocketAddress; +import junit.framework.Assert; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.FencedException; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.junit.Test; import org.mockito.Mockito; @@ -42,7 +48,7 @@ public class TestJournalService { * called. */ @Test - public void testCallBacks() throws IOException { + public void testCallBacks() throws Exception { JournalListener listener = Mockito.mock(JournalListener.class); JournalService service = null; try { @@ -51,6 +57,7 @@ public void testCallBacks() throws IOException { service = startJournalService(listener); verifyRollLogsCallback(service, listener); verifyJournalCallback(service, listener); + verifyFence(service, cluster.getNameNode(0)); } finally { if (service != null) { service.stop(); @@ -93,4 +100,28 @@ private void verifyJournalCallback(JournalService s, JournalListener l) throws I Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s), Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any()); } + + public void verifyFence(JournalService s, NameNode nn) throws Exception { + String cid = nn.getNamesystem().getClusterId(); + int nsId = nn.getNamesystem().getFSImage().getNamespaceID(); + int lv = nn.getNamesystem().getFSImage().getLayoutVersion(); + + // Fence the journal service + JournalInfo info = new JournalInfo(lv, cid, nsId); + long currentEpoch = s.getEpoch(); + + // New epoch lower than the current epoch is rejected + try { + s.fence(info, (currentEpoch - 1), "fencer"); + } catch (FencedException ignore) { /* Ignored */ } + + // New epoch equal to the current epoch is rejected + try { + s.fence(info, currentEpoch, "fencer"); + } catch (FencedException ignore) { /* Ignored */ } + + // New epoch higher than the current epoch is successful + FenceResponse resp = s.fence(info, currentEpoch+1, "fencer"); + Assert.assertNotNull(resp); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java index 97be2b843d..93de1d2e5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestGetConf.java @@ -42,6 +42,8 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; +import com.google.common.base.Joiner; + /** * Test for {@link GetConf} */ @@ -117,7 +119,12 @@ private String runTool(HdfsConfiguration conf, String[] args, boolean success) PrintStream out = new PrintStream(o, true); try { int ret = ToolRunner.run(new GetConf(conf, out, out), args); - assertEquals(success, ret == 0); + out.flush(); + System.err.println("Output: " + o.toString()); + assertEquals("Expected " + (success?"success":"failure") + + " for args: " + Joiner.on(" ").join(args) + "\n" + + "Output: " + o.toString(), + success, ret == 0); return o.toString(); } finally { o.close(); @@ -222,7 +229,9 @@ public void testEmptyConf() throws Exception { getAddressListFromTool(TestType.SECONDARY, conf, false); getAddressListFromTool(TestType.NNRPCADDRESSES, conf, false); for (Command cmd : Command.values()) { - CommandHandler handler = Command.getHandler(cmd.getName()); + String arg = cmd.getName(); + CommandHandler handler = Command.getHandler(arg); + assertNotNull("missing handler: " + cmd, handler); if (handler.key != null) { // First test with configuration missing the required key String[] args = {handler.key}; @@ -319,18 +328,36 @@ public void testFederation() throws Exception { verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses); verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses); } + + @Test + public void testGetSpecificKey() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.set("mykey", " myval "); + String[] args = {"-confKey", "mykey"}; + assertTrue(runTool(conf, args, true).equals("myval\n")); + } + + @Test + public void testExtraArgsThrowsError() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.set("mykey", "myval"); + String[] args = {"-namenodes", "unexpected-arg"}; + assertTrue(runTool(conf, args, false).contains( + "Did not expect argument: unexpected-arg")); + } /** * Tests commands other than {@link Command#NAMENODE}, {@link Command#BACKUP}, * {@link Command#SECONDARY} and {@link Command#NNRPCADDRESSES} */ + @Test public void testTool() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(false); for (Command cmd : Command.values()) { CommandHandler handler = Command.getHandler(cmd.getName()); - if (handler.key != null) { + if (handler.key != null && !"-confKey".equals(cmd.getName())) { // Add the key to the conf and ensure tool returns the right value - String[] args = {handler.key}; + String[] args = {cmd.getName()}; conf.set(handler.key, "value"); assertTrue(runTool(conf, args, true).contains("value")); } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 021d326fd4..8c66a1c74c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -204,7 +204,20 @@ Release 2.0.0 - UNRELEASED MAPREDUCE-4097. tools testcases fail because missing mrapp-generated-classpath file in classpath (rvs via tucu) - + + MAPREDUCE-4113. Fix tests org.apache.hadoop.mapred.TestClusterMRNotification + (Devaraj K via bobby) + + MAPREDUCE-4112. Fix tests org.apache.hadoop.mapred.TestClusterMapReduceTestCase + (Devaraj K via bobby) + + MAPREDUCE-4111. Fix tests in org.apache.hadoop.mapred.TestJobName (Devaraj + K via bobby) + + MAPREDUCE-4110. Fix tests in org.apache.hadoop.mapred.TestMiniMRClasspath & + org.apache.hadoop.mapred.TestMiniMRWithDFSWithDistinctUsers (Devaraj K via + bobby) + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES @@ -264,6 +277,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4073. CS assigns multiple off-switch containers when using multi-level-queues (Siddharth Seth via bobby) + MAPREDUCE-4051. Remove the empty hadoop-mapreduce-project/assembly/all.xml + file (Ravi Prakash via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/assembly/all.xml b/hadoop-mapreduce-project/assembly/all.xml deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClusterMRNotification.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClusterMRNotification.java index cedbb50877..019fc1febc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClusterMRNotification.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClusterMRNotification.java @@ -20,12 +20,9 @@ import java.io.IOException; -import org.junit.Ignore; - /** * Tests Job end notification in cluster mode. */ -@Ignore public class TestClusterMRNotification extends NotificationTestCase { public TestClusterMRNotification() throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java index 175cbc609b..ada2d0c634 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java @@ -17,15 +17,18 @@ */ package org.apache.hadoop.mapred; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.Properties; + import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.junit.Ignore; - -import java.io.*; -import java.util.Properties; -@Ignore public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase { public void _testMapReduce(boolean restart) throws Exception { OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt")); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobName.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobName.java index 9655dc57e7..4b62b4a1d8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobName.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobName.java @@ -18,23 +18,17 @@ package org.apache.hadoop.mapred; import java.io.BufferedReader; -import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; -import java.util.Iterator; -import java.util.StringTokenizer; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.serializer.JavaSerializationComparator; import org.apache.hadoop.mapred.lib.IdentityMapper; -import org.junit.Ignore; -@Ignore public class TestJobName extends ClusterMapReduceTestCase { public void testComplexName() throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java index 9f8b4a7390..6a81a41552 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRClasspath.java @@ -18,47 +18,43 @@ package org.apache.hadoop.mapred; -import java.io.*; +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.net.URI; import junit.framework.TestCase; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; -import org.junit.Ignore; +import org.junit.Assert; +import org.junit.Test; /** * A JUnit test to test Mini Map-Reduce Cluster with multiple directories * and check for correct classpath */ -@Ignore -public class TestMiniMRClasspath extends TestCase { +public class TestMiniMRClasspath { - static void configureWordCount(FileSystem fs, - String jobTracker, - JobConf conf, - String input, - int numMaps, - int numReduces, - Path inDir, Path outDir) throws IOException { + static void configureWordCount(FileSystem fs, JobConf conf, String input, + int numMaps, int numReduces, Path inDir, Path outDir) throws IOException { fs.delete(outDir, true); if (!fs.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } - { - DataOutputStream file = fs.create(new Path(inDir, "part-0")); - file.writeBytes(input); - file.close(); - } + DataOutputStream file = fs.create(new Path(inDir, "part-0")); + file.writeBytes(input); + file.close(); FileSystem.setDefaultUri(conf, fs.getUri()); - conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.CLASSIC_FRAMEWORK_NAME); - conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker); + conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.YARN_FRAMEWORK_NAME); conf.setJobName("wordcount"); conf.setInputFormat(TextInputFormat.class); @@ -74,18 +70,17 @@ static void configureWordCount(FileSystem fs, FileOutputFormat.setOutputPath(conf, outDir); conf.setNumMapTasks(numMaps); conf.setNumReduceTasks(numReduces); - //pass a job.jar already included in the hadoop build - conf.setJar("build/test/mapred/testjar/testjob.jar"); + //set the tests jar file + conf.setJarByClass(TestMiniMRClasspath.class); } - static String launchWordCount(URI fileSys, String jobTracker, JobConf conf, - String input, int numMaps, int numReduces) + static String launchWordCount(URI fileSys, JobConf conf, String input, + int numMaps, int numReduces) throws IOException { final Path inDir = new Path("/testing/wc/input"); final Path outDir = new Path("/testing/wc/output"); FileSystem fs = FileSystem.get(fileSys, conf); - configureWordCount(fs, jobTracker, conf, input, numMaps, numReduces, inDir, - outDir); + configureWordCount(fs, conf, input, numMaps, numReduces, inDir, outDir); JobClient.runJob(conf); StringBuffer result = new StringBuffer(); { @@ -107,8 +102,8 @@ static String launchWordCount(URI fileSys, String jobTracker, JobConf conf, return result.toString(); } - static String launchExternal(URI uri, String jobTracker, JobConf conf, - String input, int numMaps, int numReduces) + static String launchExternal(URI uri, JobConf conf, String input, + int numMaps, int numReduces) throws IOException { final Path inDir = new Path("/testing/ext/input"); @@ -124,8 +119,7 @@ static String launchExternal(URI uri, String jobTracker, JobConf conf, file.close(); } FileSystem.setDefaultUri(conf, uri); - conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.CLASSIC_FRAMEWORK_NAME); - conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker); + conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.YARN_FRAMEWORK_NAME); conf.setJobName("wordcount"); conf.setInputFormat(TextInputFormat.class); @@ -142,8 +136,8 @@ static String launchExternal(URI uri, String jobTracker, JobConf conf, conf.set("mapred.mapper.class", "testjar.ExternalMapperReducer"); conf.set("mapred.reducer.class", "testjar.ExternalMapperReducer"); - //pass a job.jar already included in the hadoop build - conf.setJar("build/test/mapred/testjar/testjob.jar"); + // set the tests jar file + conf.setJarByClass(TestMiniMRClasspath.class); JobClient.runJob(conf); StringBuffer result = new StringBuffer(); Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir, @@ -164,6 +158,7 @@ static String launchExternal(URI uri, String jobTracker, JobConf conf, return result.toString(); } + @Test public void testClassPath() throws IOException { String namenode = null; MiniDFSCluster dfs = null; @@ -180,13 +175,10 @@ public void testClassPath() throws IOException { mr = new MiniMRCluster(taskTrackers, namenode, 3); JobConf jobConf = new JobConf(); String result; - final String jobTrackerName = "localhost:" + mr.getJobTrackerPort(); - result = launchWordCount(fileSys.getUri(), jobTrackerName, jobConf, - "The quick brown fox\nhas many silly\n" + - "red fox sox\n", - 3, 1); - assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + - "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result); + result = launchWordCount(fileSys.getUri(), jobConf, + "The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1); + Assert.assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + + "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result); } finally { if (dfs != null) { dfs.shutdown(); } @@ -195,6 +187,7 @@ public void testClassPath() throws IOException { } } + @Test public void testExternalWritable() throws IOException { @@ -214,12 +207,10 @@ public void testExternalWritable() mr = new MiniMRCluster(taskTrackers, namenode, 3); JobConf jobConf = new JobConf(); String result; - final String jobTrackerName = "localhost:" + mr.getJobTrackerPort(); - result = launchExternal(fileSys.getUri(), jobTrackerName, jobConf, - "Dennis was here!\nDennis again!", - 3, 1); - assertEquals("Dennis again!\t1\nDennis was here!\t1\n", result); + result = launchExternal(fileSys.getUri(), jobConf, + "Dennis was here!\nDennis again!", 3, 1); + Assert.assertEquals("Dennis again!\t1\nDennis was here!\t1\n", result); } finally { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java index 7ebf8c7e77..041f2dbfbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java @@ -17,26 +17,26 @@ */ package org.apache.hadoop.mapred; -import java.io.*; +import java.io.IOException; import java.security.PrivilegedExceptionAction; -import junit.framework.TestCase; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; -import org.apache.hadoop.security.*; -import org.junit.Ignore; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; /** * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS. */ -@Ignore -public class TestMiniMRWithDFSWithDistinctUsers extends TestCase { +public class TestMiniMRWithDFSWithDistinctUsers { static final UserGroupInformation DFS_UGI = createUGI("dfs", true); static final UserGroupInformation ALICE_UGI = createUGI("alice", false); static final UserGroupInformation BOB_UGI = createUGI("bob", false); @@ -45,7 +45,6 @@ public class TestMiniMRWithDFSWithDistinctUsers extends TestCase { MiniDFSCluster dfs = null; FileSystem fs = null; Configuration conf = new Configuration(); - String jobTrackerName; static UserGroupInformation createUGI(String name, boolean issuper) { String group = issuper? "supergroup": name; @@ -71,9 +70,10 @@ public RunningJob run() throws IOException { }); rj.waitForCompletion(); - assertEquals("SUCCEEDED", JobStatus.getJobRunState(rj.getJobState())); + Assert.assertEquals("SUCCEEDED", JobStatus.getJobRunState(rj.getJobState())); } + @Before public void setUp() throws Exception { dfs = new MiniDFSCluster(conf, 4, true, null); @@ -96,29 +96,30 @@ public FileSystem run() throws IOException { mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(), 1, null, null, MR_UGI, mrConf); - jobTrackerName = "localhost:" + mr.getJobTrackerPort(); } + @After public void tearDown() throws Exception { if (mr != null) { mr.shutdown();} if (dfs != null) { dfs.shutdown(); } } + @Test public void testDistinctUsers() throws Exception { JobConf job1 = mr.createJobConf(); String input = "The quick brown fox\nhas many silly\n" + "red fox sox\n"; Path inDir = new Path("/testing/distinct/input"); Path outDir = new Path("/user/alice/output"); - TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1, - input, 2, 1, inDir, outDir); + TestMiniMRClasspath + .configureWordCount(fs, job1, input, 2, 1, inDir, outDir); runJobAsUser(job1, ALICE_UGI); JobConf job2 = mr.createJobConf(); Path inDir2 = new Path("/testing/distinct/input2"); Path outDir2 = new Path("/user/bob/output2"); - TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job2, - input, 2, 1, inDir2, outDir2); + TestMiniMRClasspath.configureWordCount(fs, job2, input, 2, 1, inDir2, + outDir2); runJobAsUser(job2, BOB_UGI); } @@ -127,6 +128,7 @@ public void testDistinctUsers() throws Exception { * task makes lots of spills (more than fit in the spill index cache) * that it will succeed. */ + @Test public void testMultipleSpills() throws Exception { JobConf job1 = mr.createJobConf(); @@ -141,8 +143,8 @@ public void testMultipleSpills() throws Exception { + "red fox sox\n"; Path inDir = new Path("/testing/distinct/input"); Path outDir = new Path("/user/alice/output"); - TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1, - input, 2, 1, inDir, outDir); + TestMiniMRClasspath + .configureWordCount(fs, job1, input, 2, 1, inDir, outDir); runJobAsUser(job1, ALICE_UGI); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerTokenPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerTokenPBImpl.java index d60ed8ca01..e85f33318e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerTokenPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerTokenPBImpl.java @@ -155,4 +155,12 @@ public synchronized void setService(String service) { builder.setService((service)); } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("ContainerToken { "); + sb.append("kind: ").append(getKind()).append(", "); + sb.append("service: ").append(getService()).append(" }"); + return sb.toString(); + } }