HDFS-6345. DFS.listCacheDirectives() should allow filtering based on cache directive ID. (wang)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1595086 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
68867fd1bc
commit
8f48760663
@ -366,6 +366,9 @@ Release 2.5.0 - UNRELEASED
|
|||||||
HDFS-5683. Better audit log messages for caching operations.
|
HDFS-5683. Better audit log messages for caching operations.
|
||||||
(Abhiraj Butala via wang)
|
(Abhiraj Butala via wang)
|
||||||
|
|
||||||
|
HDFS-6345. DFS.listCacheDirectives() should allow filtering based on
|
||||||
|
cache directive ID. (wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
||||||
|
@ -23,6 +23,10 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.BatchedRemoteIterator;
|
import org.apache.hadoop.fs.BatchedRemoteIterator;
|
||||||
|
import org.apache.hadoop.fs.InvalidRequestException;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* CacheDirectiveIterator is a remote iterator that iterates cache directives.
|
* CacheDirectiveIterator is a remote iterator that iterates cache directives.
|
||||||
@ -33,7 +37,7 @@
|
|||||||
public class CacheDirectiveIterator
|
public class CacheDirectiveIterator
|
||||||
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
|
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
|
||||||
|
|
||||||
private final CacheDirectiveInfo filter;
|
private CacheDirectiveInfo filter;
|
||||||
private final ClientProtocol namenode;
|
private final ClientProtocol namenode;
|
||||||
|
|
||||||
public CacheDirectiveIterator(ClientProtocol namenode,
|
public CacheDirectiveIterator(ClientProtocol namenode,
|
||||||
@ -43,10 +47,72 @@ public CacheDirectiveIterator(ClientProtocol namenode,
|
|||||||
this.filter = filter;
|
this.filter = filter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) {
|
||||||
|
CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder(filter);
|
||||||
|
builder.setId(null);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for compatibility when communicating with a server version that
|
||||||
|
* does not support filtering directives by ID.
|
||||||
|
*/
|
||||||
|
private static class SingleEntry implements
|
||||||
|
BatchedEntries<CacheDirectiveEntry> {
|
||||||
|
|
||||||
|
private final CacheDirectiveEntry entry;
|
||||||
|
|
||||||
|
public SingleEntry(final CacheDirectiveEntry entry) {
|
||||||
|
this.entry = entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CacheDirectiveEntry get(int i) {
|
||||||
|
if (i > 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasMore() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
|
public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return namenode.listCacheDirectives(prevKey, filter);
|
BatchedEntries<CacheDirectiveEntry> entries = null;
|
||||||
|
try {
|
||||||
|
entries = namenode.listCacheDirectives(prevKey, filter);
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (e.getMessage().contains("Filtering by ID is unsupported")) {
|
||||||
|
// Retry case for old servers, do the filtering client-side
|
||||||
|
long id = filter.getId();
|
||||||
|
filter = removeIdFromFilter(filter);
|
||||||
|
// Using id - 1 as prevId should get us a window containing the id
|
||||||
|
// This is somewhat brittle, since it depends on directives being
|
||||||
|
// returned in order of ascending ID.
|
||||||
|
entries = namenode.listCacheDirectives(id - 1, filter);
|
||||||
|
for (int i=0; i<entries.size(); i++) {
|
||||||
|
CacheDirectiveEntry entry = entries.get(i);
|
||||||
|
if (entry.getInfo().getId().equals((Long)id)) {
|
||||||
|
return new SingleEntry(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new RemoteException(InvalidRequestException.class.getName(),
|
||||||
|
"Did not find requested id " + id);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
Preconditions.checkNotNull(entries);
|
||||||
|
return entries;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -691,15 +691,25 @@ public void removeDirective(long id, FSPermissionChecker pc)
|
|||||||
assert namesystem.hasReadLock();
|
assert namesystem.hasReadLock();
|
||||||
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
||||||
String filterPath = null;
|
String filterPath = null;
|
||||||
if (filter.getId() != null) {
|
|
||||||
throw new IOException("Filtering by ID is unsupported.");
|
|
||||||
}
|
|
||||||
if (filter.getPath() != null) {
|
if (filter.getPath() != null) {
|
||||||
filterPath = validatePath(filter);
|
filterPath = validatePath(filter);
|
||||||
}
|
}
|
||||||
if (filter.getReplication() != null) {
|
if (filter.getReplication() != null) {
|
||||||
throw new IOException("Filtering by replication is unsupported.");
|
throw new InvalidRequestException(
|
||||||
|
"Filtering by replication is unsupported.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Querying for a single ID
|
||||||
|
final Long id = filter.getId();
|
||||||
|
if (id != null) {
|
||||||
|
if (!directivesById.containsKey(id)) {
|
||||||
|
throw new InvalidRequestException("Did not find requested id " + id);
|
||||||
|
}
|
||||||
|
// Since we use a tailMap on directivesById, setting prev to id-1 gets
|
||||||
|
// us the directive with the id (if present)
|
||||||
|
prevId = id - 1;
|
||||||
|
}
|
||||||
|
|
||||||
ArrayList<CacheDirectiveEntry> replies =
|
ArrayList<CacheDirectiveEntry> replies =
|
||||||
new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
|
new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
|
||||||
int numReplies = 0;
|
int numReplies = 0;
|
||||||
@ -711,6 +721,14 @@ public void removeDirective(long id, FSPermissionChecker pc)
|
|||||||
}
|
}
|
||||||
CacheDirective curDirective = cur.getValue();
|
CacheDirective curDirective = cur.getValue();
|
||||||
CacheDirectiveInfo info = cur.getValue().toInfo();
|
CacheDirectiveInfo info = cur.getValue().toInfo();
|
||||||
|
|
||||||
|
// If the requested ID is present, it should be the first item.
|
||||||
|
// Hitting this case means the ID is not present, or we're on the second
|
||||||
|
// item and should break out.
|
||||||
|
if (id != null &&
|
||||||
|
!(info.getId().equals(id))) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
if (filter.getPool() != null &&
|
if (filter.getPool() != null &&
|
||||||
!info.getPool().equals(filter.getPool())) {
|
!info.getPool().equals(filter.getPool())) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -503,19 +503,21 @@ public String getName() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getShortUsage() {
|
public String getShortUsage() {
|
||||||
return "[" + getName() + " [-stats] [-path <path>] [-pool <pool>]]\n";
|
return "[" + getName()
|
||||||
|
+ " [-stats] [-path <path>] [-pool <pool>] [-id <id>]\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getLongUsage() {
|
public String getLongUsage() {
|
||||||
TableListing listing = getOptionDescriptionListing();
|
TableListing listing = getOptionDescriptionListing();
|
||||||
|
listing.addRow("-stats", "List path-based cache directive statistics.");
|
||||||
listing.addRow("<path>", "List only " +
|
listing.addRow("<path>", "List only " +
|
||||||
"cache directives with this path. " +
|
"cache directives with this path. " +
|
||||||
"Note that if there is a cache directive for <path> " +
|
"Note that if there is a cache directive for <path> " +
|
||||||
"in a cache pool that we don't have read access for, it " +
|
"in a cache pool that we don't have read access for, it " +
|
||||||
"will not be listed.");
|
"will not be listed.");
|
||||||
listing.addRow("<pool>", "List only path cache directives in that pool.");
|
listing.addRow("<pool>", "List only path cache directives in that pool.");
|
||||||
listing.addRow("-stats", "List path-based cache directive statistics.");
|
listing.addRow("<id>", "List the cache directive with this id.");
|
||||||
return getShortUsage() + "\n" +
|
return getShortUsage() + "\n" +
|
||||||
"List cache directives.\n\n" +
|
"List cache directives.\n\n" +
|
||||||
listing.toString();
|
listing.toString();
|
||||||
@ -534,6 +536,10 @@ public int run(Configuration conf, List<String> args) throws IOException {
|
|||||||
builder.setPool(poolFilter);
|
builder.setPool(poolFilter);
|
||||||
}
|
}
|
||||||
boolean printStats = StringUtils.popOption("-stats", args);
|
boolean printStats = StringUtils.popOption("-stats", args);
|
||||||
|
String idFilter = StringUtils.popOptionWithArgument("-id", args);
|
||||||
|
if (idFilter != null) {
|
||||||
|
builder.setId(Long.parseLong(idFilter));
|
||||||
|
}
|
||||||
if (!args.isEmpty()) {
|
if (!args.isEmpty()) {
|
||||||
System.err.println("Can't understand argument: " + args.get(0));
|
System.err.println("Can't understand argument: " + args.get(0));
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -477,6 +477,12 @@ public void testAddRemoveDirectives() throws Exception {
|
|||||||
iter = dfs.listCacheDirectives(
|
iter = dfs.listCacheDirectives(
|
||||||
new CacheDirectiveInfo.Builder().setPool("pool2").build());
|
new CacheDirectiveInfo.Builder().setPool("pool2").build());
|
||||||
validateListAll(iter, betaId);
|
validateListAll(iter, betaId);
|
||||||
|
iter = dfs.listCacheDirectives(
|
||||||
|
new CacheDirectiveInfo.Builder().setId(alphaId2).build());
|
||||||
|
validateListAll(iter, alphaId2);
|
||||||
|
iter = dfs.listCacheDirectives(
|
||||||
|
new CacheDirectiveInfo.Builder().setId(relativeId).build());
|
||||||
|
validateListAll(iter, relativeId);
|
||||||
|
|
||||||
dfs.removeCacheDirective(betaId);
|
dfs.removeCacheDirective(betaId);
|
||||||
iter = dfs.listCacheDirectives(
|
iter = dfs.listCacheDirectives(
|
||||||
|
@ -519,5 +519,29 @@
|
|||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
|
||||||
|
<test> <!--Tested -->
|
||||||
|
<description>Testing listing a single cache directive</description>
|
||||||
|
<test-commands>
|
||||||
|
<cache-admin-command>-addPool pool1</cache-admin-command>
|
||||||
|
<cache-admin-command>-addDirective -path /foo -pool pool1 -ttl 2d</cache-admin-command>
|
||||||
|
<cache-admin-command>-addDirective -path /bar -pool pool1 -ttl 24h</cache-admin-command>
|
||||||
|
<cache-admin-command>-addDirective -path /baz -replication 2 -pool pool1 -ttl 60m</cache-admin-command>
|
||||||
|
<cache-admin-command>-listDirectives -stats -id 30</cache-admin-command>
|
||||||
|
</test-commands>
|
||||||
|
<cleanup-commands>
|
||||||
|
<cache-admin-command>-removePool pool1</cache-admin-command>
|
||||||
|
</cleanup-commands>
|
||||||
|
<comparators>
|
||||||
|
<comparator>
|
||||||
|
<type>SubstringComparator</type>
|
||||||
|
<expected-output>Found 1 entry</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>SubstringComparator</type>
|
||||||
|
<expected-output>30 pool1 1</expected-output>
|
||||||
|
</comparator>
|
||||||
|
</comparators>
|
||||||
|
</test>
|
||||||
</tests>
|
</tests>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
Loading…
Reference in New Issue
Block a user