HDFS-5224. Refactor PathBasedCache* methods to use a Path rather than a String. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1531406 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2013-10-11 19:44:20 +00:00
parent 3fc8792b5c
commit 8111c3af6b
16 changed files with 248 additions and 87 deletions

View File

@ -57,6 +57,9 @@ HDFS-4949 (Unreleased)
HDFS-5304. Expose if a block replica is cached in getFileBlockLocations. HDFS-5304. Expose if a block replica is cached in getFileBlockLocations.
(Contributed by Andrew Wang) (Contributed by Andrew Wang)
HDFS-5224. Refactor PathBasedCache* methods to use a Path rather than a
String. (cnauroth)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -1591,7 +1591,12 @@ public Boolean next(final FileSystem fs, final Path p)
*/ */
public PathBasedCacheDescriptor addPathBasedCacheDirective( public PathBasedCacheDescriptor addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException { PathBasedCacheDirective directive) throws IOException {
return dfs.addPathBasedCacheDirective(directive); Path path = new Path(getPathName(fixRelativePart(directive.getPath()))).
makeQualified(getUri(), getWorkingDirectory());
return dfs.addPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
setPath(path).
setPool(directive.getPool()).
build());
} }
/** /**
@ -1614,8 +1619,24 @@ public void removePathBasedCacheDescriptor(PathBasedCacheDescriptor descriptor)
* @return A RemoteIterator which returns PathBasedCacheDescriptor objects. * @return A RemoteIterator which returns PathBasedCacheDescriptor objects.
*/ */
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors( public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
String pool, String path) throws IOException { String pool, final Path path) throws IOException {
return dfs.listPathBasedCacheDescriptors(pool, path); String pathName = path != null ? getPathName(fixRelativePart(path)) : null;
final RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(pool, pathName);
return new RemoteIterator<PathBasedCacheDescriptor>() {
@Override
public boolean hasNext() throws IOException {
return iter.hasNext();
}
@Override
public PathBasedCacheDescriptor next() throws IOException {
PathBasedCacheDescriptor desc = iter.next();
Path qualPath = desc.getPath().makeQualified(getUri(), path);
return new PathBasedCacheDescriptor(desc.getEntryId(), qualPath,
desc.getPool());
}
};
} }
/** /**

View File

@ -33,12 +33,8 @@ public static final class EmptyPathError
extends AddPathBasedCacheDirectiveException { extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public EmptyPathError(String msg) { public EmptyPathError() {
super(msg); super("empty path in directive");
}
public EmptyPathError(PathBasedCacheDirective directive) {
this("empty path in directive " + directive);
} }
} }

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.HashCodeBuilder;
@ -32,7 +33,7 @@
public final class PathBasedCacheDescriptor extends PathBasedCacheDirective { public final class PathBasedCacheDescriptor extends PathBasedCacheDirective {
private final long entryId; private final long entryId;
public PathBasedCacheDescriptor(long entryId, String path, String pool) { public PathBasedCacheDescriptor(long entryId, Path path, String pool) {
super(path, pool); super(path, pool);
Preconditions.checkArgument(entryId > 0); Preconditions.checkArgument(entryId > 0);
this.entryId = entryId; this.entryId = entryId;

View File

@ -25,8 +25,8 @@
import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError;
@ -36,21 +36,54 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
@InterfaceAudience.Public @InterfaceAudience.Public
public class PathBasedCacheDirective { public class PathBasedCacheDirective {
private final String path;
private final String pool; /**
* A builder for creating new PathBasedCacheDirective instances.
*/
public static class Builder {
public PathBasedCacheDirective(String path, String pool) { private Path path;
Preconditions.checkNotNull(path); private String pool;
Preconditions.checkNotNull(pool);
this.path = path; /**
this.pool = pool; * Builds a new PathBasedCacheDirective populated with the set properties.
*
* @return New PathBasedCacheDirective.
*/
public PathBasedCacheDirective build() {
return new PathBasedCacheDirective(path, pool);
}
/**
* Sets the path used in this request.
*
* @param path The path used in this request.
* @return This builder, for call chaining.
*/
public Builder setPath(Path path) {
this.path = path;
return this;
}
/**
* Sets the pool used in this request.
*
* @param pool The pool used in this request.
* @return This builder, for call chaining.
*/
public Builder setPool(String pool) {
this.pool = pool;
return this;
}
} }
private final Path path;
private final String pool;
/** /**
* @return The path used in this request. * @return The path used in this request.
*/ */
public String getPath() { public Path getPath() {
return path; return path;
} }
@ -68,10 +101,7 @@ public String getPool() {
* If this PathBasedCacheDirective is not valid. * If this PathBasedCacheDirective is not valid.
*/ */
public void validate() throws IOException { public void validate() throws IOException {
if (path.isEmpty()) { if (!DFSUtil.isValidName(path.toUri().getPath())) {
throw new EmptyPathError(this);
}
if (!DFSUtil.isValidName(path)) {
throw new InvalidPathNameError(this); throw new InvalidPathNameError(this);
} }
if (pool.isEmpty()) { if (pool.isEmpty()) {
@ -108,4 +138,17 @@ public String toString() {
append(" }"); append(" }");
return builder.toString(); return builder.toString();
} }
/**
* Protected constructor. Callers use Builder to create new instances.
*
* @param path The path used in this request.
* @param pool The pool used in this request.
*/
protected PathBasedCacheDirective(Path path, String pool) {
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(pool);
this.path = path;
this.pool = pool;
}
}; };

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol; package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.CachePool; import org.apache.hadoop.hdfs.server.namenode.CachePool;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -65,6 +66,7 @@ public String toString() {
} }
public PathBasedCacheDescriptor getDescriptor() { public PathBasedCacheDescriptor getDescriptor() {
return new PathBasedCacheDescriptor(entryId, path, pool.getPoolName()); return new PathBasedCacheDescriptor(entryId, new Path(path),
pool.getPoolName());
} }
}; };

View File

@ -25,8 +25,10 @@
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -176,6 +178,8 @@
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.commons.lang.StringUtils;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -1035,8 +1039,13 @@ public AddPathBasedCacheDirectiveResponseProto addPathBasedCacheDirective(
throws ServiceException { throws ServiceException {
try { try {
PathBasedCacheDirectiveProto proto = request.getDirective(); PathBasedCacheDirectiveProto proto = request.getDirective();
PathBasedCacheDirective directive = if (StringUtils.isEmpty(proto.getPath())) {
new PathBasedCacheDirective(proto.getPath(), proto.getPool()); throw new EmptyPathError();
}
PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
setPath(new Path(proto.getPath())).
setPool(proto.getPool()).
build();
PathBasedCacheDescriptor descriptor = PathBasedCacheDescriptor descriptor =
server.addPathBasedCacheDirective(directive); server.addPathBasedCacheDirective(directive);
AddPathBasedCacheDirectiveResponseProto.Builder builder = AddPathBasedCacheDirectiveResponseProto.Builder builder =
@ -1080,7 +1089,7 @@ public ListPathBasedCacheDescriptorsResponseProto listPathBasedCacheDescriptors(
builder.addElements( builder.addElements(
ListPathBasedCacheDescriptorsElementProto.newBuilder(). ListPathBasedCacheDescriptorsElementProto.newBuilder().
setId(directive.getEntryId()). setId(directive.getEntryId()).
setPath(directive.getPath()). setPath(directive.getPath().toUri().getPath()).
setPool(directive.getPool())); setPool(directive.getPool()));
prevId = directive.getEntryId(); prevId = directive.getEntryId();
} }

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -1009,7 +1010,7 @@ public PathBasedCacheDescriptor addPathBasedCacheDirective(
AddPathBasedCacheDirectiveRequestProto.Builder builder = AddPathBasedCacheDirectiveRequestProto.Builder builder =
AddPathBasedCacheDirectiveRequestProto.newBuilder(); AddPathBasedCacheDirectiveRequestProto.newBuilder();
builder.setDirective(PathBasedCacheDirectiveProto.newBuilder() builder.setDirective(PathBasedCacheDirectiveProto.newBuilder()
.setPath(directive.getPath()) .setPath(directive.getPath().toUri().getPath())
.setPool(directive.getPool()) .setPool(directive.getPool())
.build()); .build());
AddPathBasedCacheDirectiveResponseProto result = AddPathBasedCacheDirectiveResponseProto result =
@ -1047,7 +1048,7 @@ public PathBasedCacheDescriptor get(int i) {
ListPathBasedCacheDescriptorsElementProto elementProto = ListPathBasedCacheDescriptorsElementProto elementProto =
response.getElements(i); response.getElements(i);
return new PathBasedCacheDescriptor(elementProto.getId(), return new PathBasedCacheDescriptor(elementProto.getId(),
elementProto.getPath(), elementProto.getPool()); new Path(elementProto.getPath()), elementProto.getPool());
} }
@Override @Override

View File

@ -36,6 +36,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
@ -138,7 +139,7 @@ synchronized long getNextEntryId() {
private synchronized PathBasedCacheEntry private synchronized PathBasedCacheEntry
findEntry(PathBasedCacheDirective directive) { findEntry(PathBasedCacheDirective directive) {
List<PathBasedCacheEntry> existing = List<PathBasedCacheEntry> existing =
entriesByPath.get(directive.getPath()); entriesByPath.get(directive.getPath().toUri().getPath());
if (existing == null) { if (existing == null) {
return null; return null;
} }
@ -246,8 +247,8 @@ PathBasedCacheDescriptor unprotectedAddDirective(
CachePool pool = cachePools.get(directive.getPool()); CachePool pool = cachePools.get(directive.getPool());
// Add a new entry with the next available ID. // Add a new entry with the next available ID.
PathBasedCacheEntry entry; PathBasedCacheEntry entry;
entry = new PathBasedCacheEntry(getNextEntryId(), directive.getPath(), entry = new PathBasedCacheEntry(getNextEntryId(),
pool); directive.getPath().toUri().getPath(), pool);
unprotectedAddEntry(entry); unprotectedAddEntry(entry);
@ -303,7 +304,7 @@ void unprotectedRemoveDescriptor(long id) throws IOException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
PathBasedCacheEntry existing = entriesById.get(id); PathBasedCacheEntry existing = entriesById.get(id);
// Remove the corresponding entry in entriesByPath. // Remove the corresponding entry in entriesByPath.
String path = existing.getDescriptor().getPath(); String path = existing.getDescriptor().getPath().toUri().getPath();
List<PathBasedCacheEntry> entries = entriesByPath.get(path); List<PathBasedCacheEntry> entries = entriesByPath.get(path);
if (entries == null || !entries.remove(existing)) { if (entries == null || !entries.remove(existing)) {
throw new UnexpectedRemovePathBasedCacheDescriptorException(id); throw new UnexpectedRemovePathBasedCacheDescriptorException(id);
@ -315,10 +316,11 @@ void unprotectedRemoveDescriptor(long id) throws IOException {
// Set the path as uncached in the namesystem // Set the path as uncached in the namesystem
try { try {
INode node = dir.getINode(existing.getDescriptor().getPath()); INode node = dir.getINode(existing.getDescriptor().getPath().toUri().
getPath());
if (node != null && node.isFile()) { if (node != null && node.isFile()) {
namesystem.setCacheReplicationInt(existing.getDescriptor().getPath(), namesystem.setCacheReplicationInt(existing.getDescriptor().getPath().
(short) 0); toUri().getPath(), (short) 0);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("removeDescriptor " + id + ": failure while setting cache" LOG.warn("removeDescriptor " + id + ": failure while setting cache"

View File

@ -958,7 +958,7 @@ void logAddPathBasedCacheDirective(PathBasedCacheDirective directive,
boolean toLogRpcIds) { boolean toLogRpcIds) {
AddPathBasedCacheDirectiveOp op = AddPathBasedCacheDirectiveOp.getInstance( AddPathBasedCacheDirectiveOp op = AddPathBasedCacheDirectiveOp.getInstance(
cache.get()) cache.get())
.setPath(directive.getPath()) .setPath(directive.getPath().toUri().getPath())
.setPool(directive.getPool()); .setPool(directive.getPool());
logRpcIds(op, toLogRpcIds); logRpcIds(op, toLogRpcIds);
logEdit(op); logEdit(op);

View File

@ -30,6 +30,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -641,8 +642,10 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
} }
case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: { case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op; AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
PathBasedCacheDirective d = new PathBasedCacheDirective(addOp.path, PathBasedCacheDirective d = new PathBasedCacheDirective.Builder().
addOp.pool); setPath(new Path(addOp.path)).
setPool(addOp.pool).
build();
PathBasedCacheDescriptor descriptor = PathBasedCacheDescriptor descriptor =
fsNamesys.getCacheManager().unprotectedAddDirective(d); fsNamesys.getCacheManager().unprotectedAddDirective(d);

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -164,8 +165,10 @@ public int run(Configuration conf, List<String> args) throws IOException {
} }
DistributedFileSystem dfs = getDFS(conf); DistributedFileSystem dfs = getDFS(conf);
PathBasedCacheDirective directive = PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
new PathBasedCacheDirective(path, poolName); setPath(new Path(path)).
setPool(poolName).
build();
try { try {
PathBasedCacheDescriptor descriptor = PathBasedCacheDescriptor descriptor =
@ -281,12 +284,14 @@ public int run(Configuration conf, List<String> args) throws IOException {
build(); build();
DistributedFileSystem dfs = getDFS(conf); DistributedFileSystem dfs = getDFS(conf);
RemoteIterator<PathBasedCacheDescriptor> iter = RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(poolFilter, pathFilter); dfs.listPathBasedCacheDescriptors(poolFilter, pathFilter != null ?
new Path(pathFilter) : null);
int numEntries = 0; int numEntries = 0;
while (iter.hasNext()) { while (iter.hasNext()) {
PathBasedCacheDescriptor entry = iter.next(); PathBasedCacheDescriptor entry = iter.next();
String row[] = new String[] { String row[] = new String[] {
"" + entry.getEntryId(), entry.getPool(), entry.getPath(), "" + entry.getEntryId(), entry.getPool(),
entry.getPath().toUri().getPath(),
}; };
tableListing.addRow(row); tableListing.addRow(row);
numEntries++; numEntries++;

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.junit.Test;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class TestClientNamenodeProtocolServerSideTranslatorPB {
@Test
public void testAddPathBasedCacheDirectiveEmptyPathError() throws Exception {
ClientProtocol server = mock(ClientProtocol.class);
RpcController controller = mock(RpcController.class);
AddPathBasedCacheDirectiveRequestProto request =
AddPathBasedCacheDirectiveRequestProto.newBuilder().
setDirective(PathBasedCacheDirectiveProto.newBuilder().
setPath("").
setPool("pool").
build()).
build();
ClientNamenodeProtocolServerSideTranslatorPB translator =
new ClientNamenodeProtocolServerSideTranslatorPB(server);
try {
translator.addPathBasedCacheDirective(controller, request);
fail("Expected ServiceException");
} catch (ServiceException e) {
assertNotNull(e.getCause());
assertTrue(e.getCause() instanceof EmptyPathError);
}
}
}

View File

@ -152,12 +152,14 @@ public void testCachePaths() throws Exception {
waitForExpectedNumCachedBlocks(expected); waitForExpectedNumCachedBlocks(expected);
// Cache and check each path in sequence // Cache and check each path in sequence
for (int i=0; i<numFiles; i++) { for (int i=0; i<numFiles; i++) {
PathBasedCacheDirective directive = new PathBasedCacheDirective(paths PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
.get(i), pool); setPath(new Path(paths.get(i))).
setPool(pool).
build();
PathBasedCacheDescriptor descriptor = PathBasedCacheDescriptor descriptor =
nnRpc.addPathBasedCacheDirective(directive); nnRpc.addPathBasedCacheDirective(directive);
assertEquals("Descriptor does not match requested path", paths.get(i), assertEquals("Descriptor does not match requested path", paths.get(i),
descriptor.getPath()); descriptor.getPath().toUri().getPath());
assertEquals("Descriptor does not match requested pool", pool, assertEquals("Descriptor does not match requested pool", pool,
descriptor.getPool()); descriptor.getPool());
expected += numBlocksPerFile; expected += numBlocksPerFile;
@ -210,8 +212,10 @@ public void testCacheManagerRestart() throws Exception {
int numEntries = 10; int numEntries = 10;
String entryPrefix = "/party-"; String entryPrefix = "/party-";
for (int i=0; i<numEntries; i++) { for (int i=0; i<numEntries; i++) {
dfs.addPathBasedCacheDirective(new PathBasedCacheDirective(entryPrefix + i, dfs.addPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
pool)); setPath(new Path(entryPrefix + i)).
setPool(pool).
build());
} }
RemoteIterator<PathBasedCacheDescriptor> dit RemoteIterator<PathBasedCacheDescriptor> dit
= dfs.listPathBasedCacheDescriptors(null, null); = dfs.listPathBasedCacheDescriptors(null, null);
@ -219,7 +223,7 @@ public void testCacheManagerRestart() throws Exception {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext()); assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDescriptor cd = dit.next(); PathBasedCacheDescriptor cd = dit.next();
assertEquals(i+1, cd.getEntryId()); assertEquals(i+1, cd.getEntryId());
assertEquals(entryPrefix + i, cd.getPath()); assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool()); assertEquals(pool, cd.getPool());
} }
assertFalse("Unexpected # of cache descriptors found", dit.hasNext()); assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
@ -243,7 +247,7 @@ public void testCacheManagerRestart() throws Exception {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext()); assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDescriptor cd = dit.next(); PathBasedCacheDescriptor cd = dit.next();
assertEquals(i+1, cd.getEntryId()); assertEquals(i+1, cd.getEntryId());
assertEquals(entryPrefix + i, cd.getPath()); assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool()); assertEquals(pool, cd.getPool());
} }
assertFalse("Unexpected # of cache descriptors found", dit.hasNext()); assertFalse("Unexpected # of cache descriptors found", dit.hasNext());

View File

@ -243,7 +243,10 @@ public Object run() throws IOException, InterruptedException {
.setWeight(1989)); .setWeight(1989));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33 // OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
PathBasedCacheDescriptor descriptor = PathBasedCacheDescriptor descriptor =
dfs.addPathBasedCacheDirective(new PathBasedCacheDirective("/bar", pool)); dfs.addPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
setPath(new Path("/bar")).
setPool(pool).
build());
// OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34 // OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34
dfs.removePathBasedCacheDescriptor(descriptor); dfs.removePathBasedCacheDescriptor(descriptor);
// OP_REMOVE_CACHE_POOL 37 // OP_REMOVE_CACHE_POOL 37

View File

@ -31,13 +31,13 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
@ -312,12 +312,18 @@ public void testAddRemoveDirectives() throws Exception {
proto.addCachePool(new CachePoolInfo("pool4"). proto.addCachePool(new CachePoolInfo("pool4").
setMode(new FsPermission((short)0))); setMode(new FsPermission((short)0)));
PathBasedCacheDirective alpha = PathBasedCacheDirective alpha = new PathBasedCacheDirective.Builder().
new PathBasedCacheDirective("/alpha", "pool1"); setPath(new Path("/alpha")).
PathBasedCacheDirective beta = setPool("pool1").
new PathBasedCacheDirective("/beta", "pool2"); build();
PathBasedCacheDirective delta = PathBasedCacheDirective beta = new PathBasedCacheDirective.Builder().
new PathBasedCacheDirective("/delta", "pool1"); setPath(new Path("/beta")).
setPool("pool2").
build();
PathBasedCacheDirective delta = new PathBasedCacheDirective.Builder().
setPath(new Path("/delta")).
setPool("pool1").
build();
PathBasedCacheDescriptor alphaD = addAsUnprivileged(alpha); PathBasedCacheDescriptor alphaD = addAsUnprivileged(alpha);
PathBasedCacheDescriptor alphaD2 = addAsUnprivileged(alpha); PathBasedCacheDescriptor alphaD2 = addAsUnprivileged(alpha);
@ -326,21 +332,20 @@ public void testAddRemoveDirectives() throws Exception {
PathBasedCacheDescriptor betaD = addAsUnprivileged(beta); PathBasedCacheDescriptor betaD = addAsUnprivileged(beta);
try { try {
addAsUnprivileged(new PathBasedCacheDirective("", "pool3")); addAsUnprivileged(new PathBasedCacheDirective.Builder().
fail("expected an error when adding an empty path"); setPath(new Path("/unicorn")).
} catch (IOException ioe) { setPool("no_such_pool").
assertTrue(ioe instanceof EmptyPathError); build());
}
try {
addAsUnprivileged(new PathBasedCacheDirective("/unicorn", "no_such_pool"));
fail("expected an error when adding to a non-existent pool."); fail("expected an error when adding to a non-existent pool.");
} catch (IOException ioe) { } catch (IOException ioe) {
assertTrue(ioe instanceof InvalidPoolNameError); assertTrue(ioe instanceof InvalidPoolNameError);
} }
try { try {
addAsUnprivileged(new PathBasedCacheDirective("/blackhole", "pool4")); addAsUnprivileged(new PathBasedCacheDirective.Builder().
setPath(new Path("/blackhole")).
setPool("pool4").
build());
fail("expected an error when adding to a pool with " + fail("expected an error when adding to a pool with " +
"mode 0 (no permissions for anyone)."); "mode 0 (no permissions for anyone).");
} catch (IOException ioe) { } catch (IOException ioe) {
@ -348,43 +353,49 @@ public void testAddRemoveDirectives() throws Exception {
} }
try { try {
addAsUnprivileged(new PathBasedCacheDirective("//illegal/path/", "pool1")); addAsUnprivileged(new PathBasedCacheDirective.Builder().
setPath(new Path("/illegal:path/")).
setPool("pool1").
build());
fail("expected an error when adding a malformed path " + fail("expected an error when adding a malformed path " +
"to the cache directives."); "to the cache directives.");
} catch (IOException ioe) { } catch (IllegalArgumentException e) {
assertTrue(ioe instanceof InvalidPathNameError); // expected
} }
try { try {
addAsUnprivileged(new PathBasedCacheDirective("/emptypoolname", "")); addAsUnprivileged(new PathBasedCacheDirective.Builder().
setPath(new Path("/emptypoolname")).
setPool("").
build());
Assert.fail("expected an error when adding a PathBasedCache " + Assert.fail("expected an error when adding a PathBasedCache " +
"directive with an empty pool name."); "directive with an empty pool name.");
} catch (IOException ioe) { } catch (IOException ioe) {
Assert.assertTrue(ioe instanceof InvalidPoolNameError); Assert.assertTrue(ioe instanceof InvalidPoolNameError);
} }
try {
addAsUnprivileged(new PathBasedCacheDirective("bogus", "pool1"));
Assert.fail("expected an error when adding a PathBasedCache " +
"directive with a non-absolute path name.");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof InvalidPathNameError);
}
PathBasedCacheDescriptor deltaD = addAsUnprivileged(delta); PathBasedCacheDescriptor deltaD = addAsUnprivileged(delta);
// We expect the following to succeed, because DistributedFileSystem
// qualifies the path.
PathBasedCacheDescriptor relativeD = addAsUnprivileged(
new PathBasedCacheDirective.Builder().
setPath(new Path("relative")).
setPool("pool1").
build());
RemoteIterator<PathBasedCacheDescriptor> iter; RemoteIterator<PathBasedCacheDescriptor> iter;
iter = proto.listPathBasedCacheDescriptors(0, null, null); iter = dfs.listPathBasedCacheDescriptors(null, null);
validateListAll(iter, alphaD, betaD, deltaD); validateListAll(iter, alphaD, betaD, deltaD, relativeD);
iter = proto.listPathBasedCacheDescriptors(0, "pool3", null); iter = dfs.listPathBasedCacheDescriptors("pool3", null);
Assert.assertFalse(iter.hasNext()); Assert.assertFalse(iter.hasNext());
iter = proto.listPathBasedCacheDescriptors(0, "pool1", null); iter = dfs.listPathBasedCacheDescriptors("pool1", null);
validateListAll(iter, alphaD, deltaD); validateListAll(iter, alphaD, deltaD, relativeD);
iter = proto.listPathBasedCacheDescriptors(0, "pool2", null); iter = dfs.listPathBasedCacheDescriptors("pool2", null);
validateListAll(iter, betaD); validateListAll(iter, betaD);
dfs.removePathBasedCacheDescriptor(betaD); dfs.removePathBasedCacheDescriptor(betaD);
iter = proto.listPathBasedCacheDescriptors(0, "pool2", null); iter = dfs.listPathBasedCacheDescriptors("pool2", null);
Assert.assertFalse(iter.hasNext()); Assert.assertFalse(iter.hasNext());
try { try {
@ -409,7 +420,8 @@ public void testAddRemoveDirectives() throws Exception {
dfs.removePathBasedCacheDescriptor(alphaD); dfs.removePathBasedCacheDescriptor(alphaD);
dfs.removePathBasedCacheDescriptor(deltaD); dfs.removePathBasedCacheDescriptor(deltaD);
iter = proto.listPathBasedCacheDescriptors(0, null, null); dfs.removePathBasedCacheDescriptor(relativeD);
iter = dfs.listPathBasedCacheDescriptors(null, null);
assertFalse(iter.hasNext()); assertFalse(iter.hasNext());
} }
} }