HDFS-5404 Resolve regressions in Windows compatibility on HDFS-4949 branch. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1535217 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2013-10-24 00:08:15 +00:00
parent 69e5f90e9f
commit 4004a42d53
4 changed files with 84 additions and 66 deletions

View File

@ -383,6 +383,7 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mlock_1native(
JNIEnv *env, jclass clazz, JNIEnv *env, jclass clazz,
jobject buffer, jlong len) jobject buffer, jlong len)
{ {
#ifdef UNIX
void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer); void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer);
PASS_EXCEPTIONS(env); PASS_EXCEPTIONS(env);
@ -390,6 +391,12 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mlock_1native(
CHECK_DIRECT_BUFFER_ADDRESS(buf); CHECK_DIRECT_BUFFER_ADDRESS(buf);
throw_ioe(env, errno); throw_ioe(env, errno);
} }
#endif
#ifdef WINDOWS
THROW(env, "java/io/IOException",
"The function POSIX.mlock_native() is not supported on Windows");
#endif
} }
/** /**
@ -404,6 +411,7 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munlock_1native(
JNIEnv *env, jclass clazz, JNIEnv *env, jclass clazz,
jobject buffer, jlong len) jobject buffer, jlong len)
{ {
#ifdef UNIX
void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer); void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer);
PASS_EXCEPTIONS(env); PASS_EXCEPTIONS(env);
@ -411,6 +419,12 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munlock_1native(
CHECK_DIRECT_BUFFER_ADDRESS(buf); CHECK_DIRECT_BUFFER_ADDRESS(buf);
throw_ioe(env, errno); throw_ioe(env, errno);
} }
#endif
#ifdef WINDOWS
THROW(env, "java/io/IOException",
"The function POSIX.munlock_native() is not supported on Windows");
#endif
} }
#ifdef __FreeBSD__ #ifdef __FreeBSD__

View File

@ -115,3 +115,6 @@ HDFS-4949 (Unreleased)
HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and
call ID to edit log. (Chris Nauroth via Colin Patrick McCabe) call ID to edit log. (Chris Nauroth via Colin Patrick McCabe)
HDFS-5404. Resolve regressions in Windows compatibility on HDFS-4949
branch. (Chris Nauroth via Andrew Wang)

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
@ -50,6 +51,7 @@
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -72,6 +74,8 @@ public class TestFsDatasetCache {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
assumeTrue(!Path.WINDOWS);
assumeTrue(NativeIO.isAvailable());
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,

View File

@ -447,77 +447,74 @@ public void testAddRemoveDirectives() throws Exception {
@Test(timeout=60000) @Test(timeout=60000)
public void testCacheManagerRestart() throws Exception { public void testCacheManagerRestart() throws Exception {
cluster.shutdown();
cluster = null;
HdfsConfiguration conf = createCachingConf(); HdfsConfiguration conf = createCachingConf();
MiniDFSCluster cluster = cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
try { cluster.waitActive();
cluster.waitActive(); DistributedFileSystem dfs = cluster.getFileSystem();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and validate a pool // Create and validate a pool
final String pool = "poolparty"; final String pool = "poolparty";
String groupName = "partygroup"; String groupName = "partygroup";
FsPermission mode = new FsPermission((short)0777); FsPermission mode = new FsPermission((short)0777);
int weight = 747; int weight = 747;
dfs.addCachePool(new CachePoolInfo(pool) dfs.addCachePool(new CachePoolInfo(pool)
.setGroupName(groupName) .setGroupName(groupName)
.setMode(mode) .setMode(mode)
.setWeight(weight)); .setWeight(weight));
RemoteIterator<CachePoolInfo> pit = dfs.listCachePools(); RemoteIterator<CachePoolInfo> pit = dfs.listCachePools();
assertTrue("No cache pools found", pit.hasNext()); assertTrue("No cache pools found", pit.hasNext());
CachePoolInfo info = pit.next(); CachePoolInfo info = pit.next();
assertEquals(pool, info.getPoolName()); assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName()); assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode()); assertEquals(mode, info.getMode());
assertEquals(weight, (int)info.getWeight()); assertEquals(weight, (int)info.getWeight());
assertFalse("Unexpected # of cache pools found", pit.hasNext()); assertFalse("Unexpected # of cache pools found", pit.hasNext());
// Create some cache entries // Create some cache entries
int numEntries = 10; int numEntries = 10;
String entryPrefix = "/party-"; String entryPrefix = "/party-";
for (int i=0; i<numEntries; i++) { for (int i=0; i<numEntries; i++) {
dfs.addPathBasedCacheDirective( dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder(). new PathBasedCacheDirective.Builder().
setPath(new Path(entryPrefix + i)).setPool(pool).build()); setPath(new Path(entryPrefix + i)).setPool(pool).build());
}
RemoteIterator<PathBasedCacheDescriptor> dit
= dfs.listPathBasedCacheDescriptors(null, null);
for (int i=0; i<numEntries; i++) {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDescriptor cd = dit.next();
assertEquals(i+1, cd.getEntryId());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
}
assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
// Restart namenode
cluster.restartNameNode();
// Check that state came back up
pit = dfs.listCachePools();
assertTrue("No cache pools found", pit.hasNext());
info = pit.next();
assertEquals(pool, info.getPoolName());
assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
assertEquals(weight, (int)info.getWeight());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
dit = dfs.listPathBasedCacheDescriptors(null, null);
for (int i=0; i<numEntries; i++) {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDescriptor cd = dit.next();
assertEquals(i+1, cd.getEntryId());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
}
assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
} finally {
cluster.shutdown();
} }
RemoteIterator<PathBasedCacheDescriptor> dit
= dfs.listPathBasedCacheDescriptors(null, null);
for (int i=0; i<numEntries; i++) {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDescriptor cd = dit.next();
assertEquals(i+1, cd.getEntryId());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
}
assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
// Restart namenode
cluster.restartNameNode();
// Check that state came back up
pit = dfs.listCachePools();
assertTrue("No cache pools found", pit.hasNext());
info = pit.next();
assertEquals(pool, info.getPoolName());
assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
assertEquals(weight, (int)info.getWeight());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
dit = dfs.listPathBasedCacheDescriptors(null, null);
for (int i=0; i<numEntries; i++) {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDescriptor cd = dit.next();
assertEquals(i+1, cd.getEntryId());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
}
assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
} }
private static void waitForCachedBlocks(NameNode nn, private static void waitForCachedBlocks(NameNode nn,