result = EnumSet.noneOf(CacheFlag.class);
+ if ((flags & CacheFlagProto.FORCE_VALUE) == CacheFlagProto.FORCE_VALUE) {
+ result.add(CacheFlag.FORCE);
+ }
+ return result;
+ }
+
public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
if (fs == null)
return null;
@@ -1795,8 +1813,8 @@ public static CachePoolInfoProto convert(CachePoolInfo info) {
if (info.getMode() != null) {
builder.setMode(info.getMode().toShort());
}
- if (info.getWeight() != null) {
- builder.setWeight(info.getWeight());
+ if (info.getLimit() != null) {
+ builder.setLimit(info.getLimit());
}
return builder.build();
}
@@ -1814,8 +1832,8 @@ public static CachePoolInfo convert (CachePoolInfoProto proto) {
if (proto.hasMode()) {
info.setMode(new FsPermission((short)proto.getMode()));
}
- if (proto.hasWeight()) {
- info.setWeight(proto.getWeight());
+ if (proto.hasLimit()) {
+ info.setLimit(proto.getLimit());
}
return info;
}
@@ -1824,6 +1842,7 @@ public static CachePoolStatsProto convert(CachePoolStats stats) {
CachePoolStatsProto.Builder builder = CachePoolStatsProto.newBuilder();
builder.setBytesNeeded(stats.getBytesNeeded());
builder.setBytesCached(stats.getBytesCached());
+ builder.setBytesOverlimit(stats.getBytesOverlimit());
builder.setFilesNeeded(stats.getFilesNeeded());
builder.setFilesCached(stats.getFilesCached());
return builder.build();
@@ -1833,6 +1852,7 @@ public static CachePoolStats convert (CachePoolStatsProto proto) {
CachePoolStats.Builder builder = new CachePoolStats.Builder();
builder.setBytesNeeded(proto.getBytesNeeded());
builder.setBytesCached(proto.getBytesCached());
+ builder.setBytesOverlimit(proto.getBytesOverlimit());
builder.setFilesNeeded(proto.getFilesNeeded());
builder.setFilesCached(proto.getFilesCached());
return builder.build();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index a36dc84e74..c3ae8881c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -27,6 +27,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,6 +51,8 @@
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.Time;
+import com.google.common.base.Preconditions;
+
/**
* Scans the namesystem, scheduling blocks to be cached as appropriate.
*
@@ -79,26 +84,53 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
private final long intervalMs;
/**
- * True if we should rescan immediately, regardless of how much time
- * elapsed since the previous scan.
+ * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
+ * waiting for rescan operations.
*/
- private boolean rescanImmediately;
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Notifies the scan thread that an immediate rescan is needed.
+ */
+ private final Condition doRescan = lock.newCondition();
+
+ /**
+ * Notifies waiting threads that a rescan has finished.
+ */
+ private final Condition scanFinished = lock.newCondition();
+
+ /**
+ * Whether there are pending CacheManager operations that necessitate a
+ * CacheReplicationMonitor rescan. Protected by the CRM lock.
+ */
+ private boolean needsRescan = true;
+
+ /**
+ * Whether we are currently doing a rescan. Protected by the CRM lock.
+ */
+ private boolean isScanning = false;
+
+ /**
+ * The number of rescans completed. Used to wait for scans to finish.
+ * Protected by the CacheReplicationMonitor lock.
+ */
+ private long scanCount = 0;
+
+ /**
+ * True if this monitor should terminate. Protected by the CRM lock.
+ */
+ private boolean shutdown = false;
/**
* The monotonic time at which the current scan started.
*/
- private long scanTimeMs;
+ private long startTimeMs;
/**
* Mark status of the current scan.
*/
private boolean mark = false;
- /**
- * True if this monitor should terminate.
- */
- private boolean shutdown;
-
/**
* Cache directives found in the previous scan.
*/
@@ -108,7 +140,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
* Blocks found in the previous scan.
*/
private long scannedBlocks;
-
+
public CacheReplicationMonitor(FSNamesystem namesystem,
CacheManager cacheManager, long intervalMs) {
this.namesystem = namesystem;
@@ -120,41 +152,60 @@ public CacheReplicationMonitor(FSNamesystem namesystem,
@Override
public void run() {
- shutdown = false;
- rescanImmediately = true;
- scanTimeMs = 0;
+ startTimeMs = 0;
LOG.info("Starting CacheReplicationMonitor with interval " +
intervalMs + " milliseconds");
try {
long curTimeMs = Time.monotonicNow();
while (true) {
- synchronized(this) {
+ // Not all of the variables accessed here need the CRM lock, but take
+ // it anyway for simplicity
+ lock.lock();
+ try {
while (true) {
if (shutdown) {
LOG.info("Shutting down CacheReplicationMonitor");
return;
}
- if (rescanImmediately) {
- LOG.info("Rescanning on request");
- rescanImmediately = false;
+ if (needsRescan) {
+ LOG.info("Rescanning because of pending operations");
break;
}
- long delta = (scanTimeMs + intervalMs) - curTimeMs;
+ long delta = (startTimeMs + intervalMs) - curTimeMs;
if (delta <= 0) {
- LOG.info("Rescanning after " + (curTimeMs - scanTimeMs) +
+ LOG.info("Rescanning after " + (curTimeMs - startTimeMs) +
" milliseconds");
break;
}
- this.wait(delta);
+ doRescan.await(delta, TimeUnit.MILLISECONDS);
curTimeMs = Time.monotonicNow();
}
+ } finally {
+ lock.unlock();
}
- scanTimeMs = curTimeMs;
+ // Mark scan as started, clear needsRescan
+ lock.lock();
+ try {
+ isScanning = true;
+ needsRescan = false;
+ } finally {
+ lock.unlock();
+ }
+ startTimeMs = curTimeMs;
mark = !mark;
rescan();
curTimeMs = Time.monotonicNow();
+ // Retake the CRM lock to update synchronization-related variables
+ lock.lock();
+ try {
+ isScanning = false;
+ scanCount++;
+ scanFinished.signalAll();
+ } finally {
+ lock.unlock();
+ }
LOG.info("Scanned " + scannedDirectives + " directive(s) and " +
- scannedBlocks + " block(s) in " + (curTimeMs - scanTimeMs) + " " +
+ scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " +
"millisecond(s).");
}
} catch (Throwable t) {
@@ -164,15 +215,91 @@ public void run() {
}
/**
- * Kick the monitor thread.
- *
- * If it is sleeping, it will wake up and start scanning.
- * If it is currently scanning, it will finish the scan and immediately do
- * another one.
+ * Similar to {@link CacheReplicationMonitor#waitForRescan()}, except it only
+ * waits if there are pending operations that necessitate a rescan as
+ * indicated by {@link #setNeedsRescan()}.
+ *
+ * Note that this call may release the FSN lock, so operations before and
+ * after are not necessarily atomic.
*/
- public synchronized void kick() {
- rescanImmediately = true;
- this.notifyAll();
+ public void waitForRescanIfNeeded() {
+ lock.lock();
+ try {
+ if (!needsRescan) {
+ return;
+ }
+ } finally {
+ lock.unlock();
+ }
+ waitForRescan();
+ }
+
+ /**
+ * Waits for a rescan to complete. This doesn't guarantee consistency with
+ * pending operations, only relative recency, since it will not force a new
+ * rescan if a rescan is already underway.
+ *
+ * Note that this call will release the FSN lock, so operations before and
+ * after are not atomic.
+ */
+ public void waitForRescan() {
+ // Drop the FSN lock temporarily and retake it after we finish waiting
+ // Need to handle both the read lock and the write lock
+ boolean retakeWriteLock = false;
+ if (namesystem.hasWriteLock()) {
+ namesystem.writeUnlock();
+ retakeWriteLock = true;
+ } else if (namesystem.hasReadLock()) {
+ namesystem.readUnlock();
+ } else {
+ // Expected to have at least one of the locks
+ Preconditions.checkState(false,
+ "Need to be holding either the read or write lock");
+ }
+ // try/finally for retaking FSN lock
+ try {
+ lock.lock();
+ // try/finally for releasing CRM lock
+ try {
+ // If no scan is already ongoing, mark the CRM as dirty and kick
+ if (!isScanning) {
+ needsRescan = true;
+ doRescan.signal();
+ }
+ // Wait until the scan finishes and the count advances
+ final long startCount = scanCount;
+ while (startCount >= scanCount) {
+ try {
+ scanFinished.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
+ + " rescan", e);
+ break;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ } finally {
+ if (retakeWriteLock) {
+ namesystem.writeLock();
+ } else {
+ namesystem.readLock();
+ }
+ }
+ }
+
+ /**
+ * Indicates to the CacheReplicationMonitor that there have been CacheManager
+ * changes that require a rescan.
+ */
+ public void setNeedsRescan() {
+ lock.lock();
+ try {
+ this.needsRescan = true;
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -180,10 +307,14 @@ public synchronized void kick() {
*/
@Override
public void close() throws IOException {
- synchronized(this) {
+ lock.lock();
+ try {
if (shutdown) return;
shutdown = true;
- this.notifyAll();
+ doRescan.signalAll();
+ scanFinished.signalAll();
+ } finally {
+ lock.unlock();
}
try {
if (this.isAlive()) {
@@ -228,12 +359,14 @@ private void rescanCacheDirectives() {
// Reset the directive's statistics
directive.resetStatistics();
// Skip processing this entry if it has expired
- LOG.info("Directive expiry is at " + directive.getExpiryTime());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Directive expiry is at " + directive.getExpiryTime());
+ }
if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping directive id " + directive.getId()
+ " because it has expired (" + directive.getExpiryTime() + ">="
- + now);
+ + now + ")");
}
continue;
}
@@ -280,15 +413,27 @@ private void rescanFile(CacheDirective directive, INodeFile file) {
// Increment the "needed" statistics
directive.addFilesNeeded(1);
- long neededTotal = 0;
- for (BlockInfo blockInfo : blockInfos) {
- long neededByBlock =
- directive.getReplication() * blockInfo.getNumBytes();
- neededTotal += neededByBlock;
- }
+ // We don't cache UC blocks, don't add them to the total here
+ long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() *
+ directive.getReplication();
directive.addBytesNeeded(neededTotal);
- // TODO: Enforce per-pool quotas
+ // The pool's bytesNeeded is incremented as we scan. If the demand
+ // thus far plus the demand of this file would exceed the pool's limit,
+ // do not cache this file.
+ CachePool pool = directive.getPool();
+ if (pool.getBytesNeeded() > pool.getLimit()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Skipping directive id %d file %s because "
+ + "limit of pool %s would be exceeded (%d > %d)",
+ directive.getId(),
+ file.getFullPathName(),
+ pool.getPoolName(),
+ pool.getBytesNeeded(),
+ pool.getLimit()));
+ }
+ return;
+ }
long cachedTotal = 0;
for (BlockInfo blockInfo : blockInfos) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index 94c62a9cbc..82bb4e8f6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -27,11 +27,12 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -45,13 +46,16 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -68,7 +72,7 @@
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
@@ -341,6 +345,67 @@ private static long validateExpiryTime(CacheDirectiveInfo directive,
return expiryTime;
}
+ /**
+ * Throws an exception if the CachePool does not have enough capacity to
+ * cache the given path at the replication factor.
+ *
+ * @param pool CachePool where the path is being cached
+ * @param path Path that is being cached
+ * @param replication Replication factor of the path
+ * @throws InvalidRequestException if the pool does not have enough capacity
+ */
+ private void checkLimit(CachePool pool, String path,
+ short replication) throws InvalidRequestException {
+ CacheDirectiveStats stats = computeNeeded(path, replication);
+ if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool
+ .getLimit()) {
+ throw new InvalidRequestException("Caching path " + path + " of size "
+ + stats.getBytesNeeded() / replication + " bytes at replication "
+ + replication + " would exceed pool " + pool.getPoolName()
+ + "'s remaining capacity of "
+ + (pool.getLimit() - pool.getBytesNeeded()) + " bytes.");
+ }
+ }
+
+ /**
+ * Computes the needed number of bytes and files for a path.
+ * @return CacheDirectiveStats describing the needed stats for this path
+ */
+ private CacheDirectiveStats computeNeeded(String path, short replication) {
+ FSDirectory fsDir = namesystem.getFSDirectory();
+ INode node;
+ long requestedBytes = 0;
+ long requestedFiles = 0;
+ CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
+ try {
+ node = fsDir.getINode(path);
+ } catch (UnresolvedLinkException e) {
+ // We don't cache through symlinks
+ return builder.build();
+ }
+ if (node == null) {
+ return builder.build();
+ }
+ if (node.isFile()) {
+ requestedFiles = 1;
+ INodeFile file = node.asFile();
+ requestedBytes = file.computeFileSize();
+ } else if (node.isDirectory()) {
+ INodeDirectory dir = node.asDirectory();
+ ReadOnlyList children = dir.getChildrenList(null);
+ requestedFiles = children.size();
+ for (INode child : children) {
+ if (child.isFile()) {
+ requestedBytes += child.asFile().computeFileSize();
+ }
+ }
+ }
+ return new CacheDirectiveStats.Builder()
+ .setBytesNeeded(requestedBytes)
+ .setFilesCached(requestedFiles)
+ .build();
+ }
+
/**
* Get a CacheDirective by ID, validating the ID and that the directive
* exists.
@@ -384,6 +449,15 @@ private void addInternal(CacheDirective directive, CachePool pool) {
directivesByPath.put(path, directives);
}
directives.add(directive);
+ // Fix up pool stats
+ CacheDirectiveStats stats =
+ computeNeeded(directive.getPath(), directive.getReplication());
+ directive.addBytesNeeded(stats.getBytesNeeded());
+ directive.addFilesNeeded(directive.getFilesNeeded());
+
+ if (monitor != null) {
+ monitor.setNeedsRescan();
+ }
}
/**
@@ -407,7 +481,7 @@ CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
}
public CacheDirectiveInfo addDirective(
- CacheDirectiveInfo info, FSPermissionChecker pc)
+ CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet flags)
throws IOException {
assert namesystem.hasWriteLock();
CacheDirective directive;
@@ -418,6 +492,14 @@ public CacheDirectiveInfo addDirective(
short replication = validateReplication(info, (short)1);
long expiryTime = validateExpiryTime(info,
CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
+ // Do quota validation if required
+ if (!flags.contains(CacheFlag.FORCE)) {
+ // Can't kick and wait if caching is disabled
+ if (monitor != null) {
+ monitor.waitForRescan();
+ }
+ checkLimit(pool, path, replication);
+ }
// All validation passed
// Add a new entry with the next available ID.
long id = getNextDirectiveId();
@@ -428,14 +510,11 @@ public CacheDirectiveInfo addDirective(
throw e;
}
LOG.info("addDirective of " + info + " successful.");
- if (monitor != null) {
- monitor.kick();
- }
return directive.toInfo();
}
public void modifyDirective(CacheDirectiveInfo info,
- FSPermissionChecker pc) throws IOException {
+ FSPermissionChecker pc, EnumSet flags) throws IOException {
assert namesystem.hasWriteLock();
String idString =
(info.getId() == null) ?
@@ -463,6 +542,13 @@ public void modifyDirective(CacheDirectiveInfo info,
if (info.getPool() != null) {
pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool);
+ if (!flags.contains(CacheFlag.FORCE)) {
+ // Can't kick and wait if caching is disabled
+ if (monitor != null) {
+ monitor.waitForRescan();
+ }
+ checkLimit(pool, path, replication);
+ }
}
removeInternal(prevEntry);
CacheDirective newEntry =
@@ -489,9 +575,18 @@ public void removeInternal(CacheDirective directive)
if (directives.size() == 0) {
directivesByPath.remove(path);
}
+ // Fix up the stats from removing the pool
+ final CachePool pool = directive.getPool();
+ directive.addBytesNeeded(-directive.getBytesNeeded());
+ directive.addFilesNeeded(-directive.getFilesNeeded());
+
directivesById.remove(directive.getId());
- directive.getPool().getDirectiveList().remove(directive);
+ pool.getDirectiveList().remove(directive);
assert directive.getPool() == null;
+
+ if (monitor != null) {
+ monitor.setNeedsRescan();
+ }
}
public void removeDirective(long id, FSPermissionChecker pc)
@@ -505,9 +600,6 @@ public void removeDirective(long id, FSPermissionChecker pc)
LOG.warn("removeDirective of " + id + " failed: ", e);
throw e;
}
- if (monitor != null) {
- monitor.kick();
- }
LOG.info("removeDirective of " + id + " successful.");
}
@@ -527,6 +619,9 @@ public void removeDirective(long id, FSPermissionChecker pc)
if (filter.getReplication() != null) {
throw new IOException("Filtering by replication is unsupported.");
}
+ if (monitor != null) {
+ monitor.waitForRescanIfNeeded();
+ }
ArrayList replies =
new ArrayList(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0;
@@ -573,16 +668,22 @@ public void removeDirective(long id, FSPermissionChecker pc)
public CachePoolInfo addCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
- CachePoolInfo.validate(info);
- String poolName = info.getPoolName();
- CachePool pool = cachePools.get(poolName);
- if (pool != null) {
- throw new InvalidRequestException("Cache pool " + poolName
- + " already exists.");
+ CachePool pool;
+ try {
+ CachePoolInfo.validate(info);
+ String poolName = info.getPoolName();
+ pool = cachePools.get(poolName);
+ if (pool != null) {
+ throw new InvalidRequestException("Cache pool " + poolName
+ + " already exists.");
+ }
+ pool = CachePool.createFromInfoAndDefaults(info);
+ cachePools.put(pool.getPoolName(), pool);
+ } catch (IOException e) {
+ LOG.info("addCachePool of " + info + " failed: ", e);
+ throw e;
}
- pool = CachePool.createFromInfoAndDefaults(info);
- cachePools.put(pool.getPoolName(), pool);
- LOG.info("Created new cache pool " + pool);
+ LOG.info("addCachePool of " + info + " successful.");
return pool.getInfo(true);
}
@@ -597,42 +698,51 @@ public CachePoolInfo addCachePool(CachePoolInfo info)
public void modifyCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
- CachePoolInfo.validate(info);
- String poolName = info.getPoolName();
- CachePool pool = cachePools.get(poolName);
- if (pool == null) {
- throw new InvalidRequestException("Cache pool " + poolName
- + " does not exist.");
- }
StringBuilder bld = new StringBuilder();
- String prefix = "";
- if (info.getOwnerName() != null) {
- pool.setOwnerName(info.getOwnerName());
- bld.append(prefix).
- append("set owner to ").append(info.getOwnerName());
- prefix = "; ";
+ try {
+ CachePoolInfo.validate(info);
+ String poolName = info.getPoolName();
+ CachePool pool = cachePools.get(poolName);
+ if (pool == null) {
+ throw new InvalidRequestException("Cache pool " + poolName
+ + " does not exist.");
+ }
+ String prefix = "";
+ if (info.getOwnerName() != null) {
+ pool.setOwnerName(info.getOwnerName());
+ bld.append(prefix).
+ append("set owner to ").append(info.getOwnerName());
+ prefix = "; ";
+ }
+ if (info.getGroupName() != null) {
+ pool.setGroupName(info.getGroupName());
+ bld.append(prefix).
+ append("set group to ").append(info.getGroupName());
+ prefix = "; ";
+ }
+ if (info.getMode() != null) {
+ pool.setMode(info.getMode());
+ bld.append(prefix).append("set mode to " + info.getMode());
+ prefix = "; ";
+ }
+ if (info.getLimit() != null) {
+ pool.setLimit(info.getLimit());
+ bld.append(prefix).append("set limit to " + info.getLimit());
+ prefix = "; ";
+ // New limit changes stats, need to set needs refresh
+ if (monitor != null) {
+ monitor.setNeedsRescan();
+ }
+ }
+ if (prefix.isEmpty()) {
+ bld.append("no changes.");
+ }
+ } catch (IOException e) {
+ LOG.info("modifyCachePool of " + info + " failed: ", e);
+ throw e;
}
- if (info.getGroupName() != null) {
- pool.setGroupName(info.getGroupName());
- bld.append(prefix).
- append("set group to ").append(info.getGroupName());
- prefix = "; ";
- }
- if (info.getMode() != null) {
- pool.setMode(info.getMode());
- bld.append(prefix).append("set mode to " + info.getMode());
- prefix = "; ";
- }
- if (info.getWeight() != null) {
- pool.setWeight(info.getWeight());
- bld.append(prefix).
- append("set weight to ").append(info.getWeight());
- prefix = "; ";
- }
- if (prefix.isEmpty()) {
- bld.append("no changes.");
- }
- LOG.info("modified " + poolName + "; " + bld.toString());
+ LOG.info("modifyCachePool of " + info.getPoolName() + " successful; "
+ + bld.toString());
}
/**
@@ -646,28 +756,37 @@ public void modifyCachePool(CachePoolInfo info)
public void removeCachePool(String poolName)
throws IOException {
assert namesystem.hasWriteLock();
- CachePoolInfo.validateName(poolName);
- CachePool pool = cachePools.remove(poolName);
- if (pool == null) {
- throw new InvalidRequestException(
- "Cannot remove non-existent cache pool " + poolName);
- }
- // Remove all directives in this pool.
- Iterator iter = pool.getDirectiveList().iterator();
- while (iter.hasNext()) {
- CacheDirective directive = iter.next();
- directivesByPath.remove(directive.getPath());
- directivesById.remove(directive.getId());
- iter.remove();
- }
- if (monitor != null) {
- monitor.kick();
+ try {
+ CachePoolInfo.validateName(poolName);
+ CachePool pool = cachePools.remove(poolName);
+ if (pool == null) {
+ throw new InvalidRequestException(
+ "Cannot remove non-existent cache pool " + poolName);
+ }
+ // Remove all directives in this pool.
+ Iterator iter = pool.getDirectiveList().iterator();
+ while (iter.hasNext()) {
+ CacheDirective directive = iter.next();
+ directivesByPath.remove(directive.getPath());
+ directivesById.remove(directive.getId());
+ iter.remove();
+ }
+ if (monitor != null) {
+ monitor.setNeedsRescan();
+ }
+ } catch (IOException e) {
+ LOG.info("removeCachePool of " + poolName + " failed: ", e);
+ throw e;
}
+ LOG.info("removeCachePool of " + poolName + " successful.");
}
public BatchedListEntries
listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadLock();
+ if (monitor != null) {
+ monitor.waitForRescanIfNeeded();
+ }
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
ArrayList results =
new ArrayList(NUM_PRE_ALLOCATED_ENTRIES);
@@ -782,7 +901,7 @@ private void processCacheReportImpl(final DatanodeDescriptor datanode,
* @param sdPath path of the storage directory
* @throws IOException
*/
- public void saveState(DataOutput out, String sdPath)
+ public void saveState(DataOutputStream out, String sdPath)
throws IOException {
out.writeLong(nextDirectiveId);
savePools(out, sdPath);
@@ -805,7 +924,7 @@ public void loadState(DataInput in) throws IOException {
/**
* Save cache pools to fsimage
*/
- private void savePools(DataOutput out,
+ private void savePools(DataOutputStream out,
String sdPath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS, sdPath);
@@ -814,7 +933,7 @@ private void savePools(DataOutput out,
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(cachePools.size());
for (CachePool pool: cachePools.values()) {
- pool.getInfo(true).writeTo(out);
+ FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -823,7 +942,7 @@ private void savePools(DataOutput out,
/*
* Save cache entries to fsimage
*/
- private void saveDirectives(DataOutput out, String sdPath)
+ private void saveDirectives(DataOutputStream out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
@@ -832,11 +951,7 @@ private void saveDirectives(DataOutput out, String sdPath)
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(directivesById.size());
for (CacheDirective directive : directivesById.values()) {
- out.writeLong(directive.getId());
- Text.writeString(out, directive.getPath());
- out.writeShort(directive.getReplication());
- Text.writeString(out, directive.getPool().getPoolName());
- out.writeLong(directive.getExpiryTime());
+ FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -854,7 +969,7 @@ private void loadPools(DataInput in)
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfPools; i++) {
- addCachePool(CachePoolInfo.readFrom(in));
+ addCachePool(FSImageSerialization.readCachePoolInfo(in));
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
@@ -871,19 +986,17 @@ private void loadDirectives(DataInput in) throws IOException {
prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numDirectives; i++) {
- long directiveId = in.readLong();
- String path = Text.readString(in);
- short replication = in.readShort();
- String poolName = Text.readString(in);
- long expiryTime = in.readLong();
+ CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
// Get pool reference by looking it up in the map
+ final String poolName = info.getPool();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new IOException("Directive refers to pool " + poolName +
", which does not exist.");
}
CacheDirective directive =
- new CacheDirective(directiveId, path, replication, expiryTime);
+ new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
+ info.getReplication(), info.getExpiration().getAbsoluteMillis());
boolean addedDirective = pool.getDirectiveList().add(directive);
assert addedDirective;
if (directivesById.put(directive.getId(), directive) != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
index 249ea66b1d..3da7437acc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
@@ -49,8 +49,8 @@
public final class CachePool {
public static final Log LOG = LogFactory.getLog(CachePool.class);
- public static final int DEFAULT_WEIGHT = 100;
-
+ public static final long DEFAULT_LIMIT = Long.MAX_VALUE;
+
@Nonnull
private final String poolName;
@@ -71,7 +71,10 @@ public final class CachePool {
@Nonnull
private FsPermission mode;
- private int weight;
+ /**
+ * Maximum number of bytes that can be cached in this pool.
+ */
+ private long limit;
private long bytesNeeded;
private long bytesCached;
@@ -118,10 +121,10 @@ static CachePool createFromInfoAndDefaults(CachePoolInfo info)
}
FsPermission mode = (info.getMode() == null) ?
FsPermission.getCachePoolDefault() : info.getMode();
- Integer weight = (info.getWeight() == null) ?
- DEFAULT_WEIGHT : info.getWeight();
+ long limit = info.getLimit() == null ?
+ DEFAULT_LIMIT : info.getLimit();
return new CachePool(info.getPoolName(),
- ownerName, groupName, mode, weight);
+ ownerName, groupName, mode, limit);
}
/**
@@ -131,11 +134,11 @@ static CachePool createFromInfoAndDefaults(CachePoolInfo info)
static CachePool createFromInfo(CachePoolInfo info) {
return new CachePool(info.getPoolName(),
info.getOwnerName(), info.getGroupName(),
- info.getMode(), info.getWeight());
+ info.getMode(), info.getLimit());
}
CachePool(String poolName, String ownerName, String groupName,
- FsPermission mode, int weight) {
+ FsPermission mode, long limit) {
Preconditions.checkNotNull(poolName);
Preconditions.checkNotNull(ownerName);
Preconditions.checkNotNull(groupName);
@@ -144,7 +147,7 @@ static CachePool createFromInfo(CachePoolInfo info) {
this.ownerName = ownerName;
this.groupName = groupName;
this.mode = new FsPermission(mode);
- this.weight = weight;
+ this.limit = limit;
}
public String getPoolName() {
@@ -177,16 +180,16 @@ public CachePool setMode(FsPermission mode) {
this.mode = new FsPermission(mode);
return this;
}
-
- public int getWeight() {
- return weight;
+
+ public long getLimit() {
+ return limit;
}
- public CachePool setWeight(int weight) {
- this.weight = weight;
+ public CachePool setLimit(long bytes) {
+ this.limit = bytes;
return this;
}
-
+
/**
* Get either full or partial information about this CachePool.
*
@@ -204,7 +207,7 @@ CachePoolInfo getInfo(boolean fullInfo) {
return info.setOwnerName(ownerName).
setGroupName(groupName).
setMode(new FsPermission(mode)).
- setWeight(weight);
+ setLimit(limit);
}
/**
@@ -241,6 +244,10 @@ public long getBytesCached() {
return bytesCached;
}
+ public long getBytesOverlimit() {
+ return Math.max(bytesNeeded-limit, 0);
+ }
+
public long getFilesNeeded() {
return filesNeeded;
}
@@ -258,6 +265,7 @@ private CachePoolStats getStats() {
return new CachePoolStats.Builder().
setBytesNeeded(bytesNeeded).
setBytesCached(bytesCached).
+ setBytesOverlimit(getBytesOverlimit()).
setFilesNeeded(filesNeeded).
setFilesCached(filesCached).
build();
@@ -291,7 +299,7 @@ public String toString() {
append(", ownerName:").append(ownerName).
append(", groupName:").append(groupName).
append(", mode:").append(mode).
- append(", weight:").append(weight).
+ append(", limit:").append(limit).
append(" }").toString();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index d9c091ba74..be328f7177 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -24,12 +24,14 @@
import java.io.InputStream;
import java.util.Arrays;
import java.util.EnumMap;
+import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -650,7 +652,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
ModifyCacheDirectiveInfoOp modifyOp =
(ModifyCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().modifyDirective(
- modifyOp.directive, null);
+ modifyOp.directive, null, EnumSet.of(CacheFlag.FORCE));
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 5b81d3a7f7..6a852c4367 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -64,7 +64,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.EnumMap;
import java.util.List;
import java.util.zip.CheckedInputStream;
@@ -76,7 +75,6 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -2895,56 +2893,25 @@ public AddCacheDirectiveInfoOp setDirective(
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
- long id = FSImageSerialization.readLong(in);
- String path = FSImageSerialization.readString(in);
- short replication = FSImageSerialization.readShort(in);
- String pool = FSImageSerialization.readString(in);
- long expiryTime = FSImageSerialization.readLong(in);
- directive = new CacheDirectiveInfo.Builder().
- setId(id).
- setPath(new Path(path)).
- setReplication(replication).
- setPool(pool).
- setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)).
- build();
+ directive = FSImageSerialization.readCacheDirectiveInfo(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
- FSImageSerialization.writeLong(directive.getId(), out);
- FSImageSerialization.writeString(directive.getPath().toUri().getPath(), out);
- FSImageSerialization.writeShort(directive.getReplication(), out);
- FSImageSerialization.writeString(directive.getPool(), out);
- FSImageSerialization.writeLong(
- directive.getExpiration().getMillis(), out);
+ FSImageSerialization.writeCacheDirectiveInfo(out, directive);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
- XMLUtils.addSaxString(contentHandler, "ID",
- directive.getId().toString());
- XMLUtils.addSaxString(contentHandler, "PATH",
- directive.getPath().toUri().getPath());
- XMLUtils.addSaxString(contentHandler, "REPLICATION",
- Short.toString(directive.getReplication()));
- XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
- XMLUtils.addSaxString(contentHandler, "EXPIRATION",
- "" + directive.getExpiration().getMillis());
+ FSImageSerialization.writeCacheDirectiveInfo(contentHandler, directive);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
- directive = new CacheDirectiveInfo.Builder().
- setId(Long.parseLong(st.getValue("ID"))).
- setPath(new Path(st.getValue("PATH"))).
- setReplication(Short.parseShort(st.getValue("REPLICATION"))).
- setPool(st.getValue("POOL")).
- setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
- Long.parseLong(st.getValue("EXPIRATION")))).
- build();
+ directive = FSImageSerialization.readCacheDirectiveInfo(st);
readRpcIdsFromXml(st);
}
@@ -2988,104 +2955,25 @@ public ModifyCacheDirectiveInfoOp setDirective(
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
- CacheDirectiveInfo.Builder builder =
- new CacheDirectiveInfo.Builder();
- builder.setId(FSImageSerialization.readLong(in));
- byte flags = in.readByte();
- if ((flags & 0x1) != 0) {
- builder.setPath(new Path(FSImageSerialization.readString(in)));
- }
- if ((flags & 0x2) != 0) {
- builder.setReplication(FSImageSerialization.readShort(in));
- }
- if ((flags & 0x4) != 0) {
- builder.setPool(FSImageSerialization.readString(in));
- }
- if ((flags & 0x8) != 0) {
- builder.setExpiration(
- CacheDirectiveInfo.Expiration.newAbsolute(
- FSImageSerialization.readLong(in)));
- }
- if ((flags & ~0xF) != 0) {
- throw new IOException("unknown flags set in " +
- "ModifyCacheDirectiveInfoOp: " + flags);
- }
- this.directive = builder.build();
+ this.directive = FSImageSerialization.readCacheDirectiveInfo(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
- FSImageSerialization.writeLong(directive.getId(), out);
- byte flags = (byte)(
- ((directive.getPath() != null) ? 0x1 : 0) |
- ((directive.getReplication() != null) ? 0x2 : 0) |
- ((directive.getPool() != null) ? 0x4 : 0) |
- ((directive.getExpiration() != null) ? 0x8 : 0)
- );
- out.writeByte(flags);
- if (directive.getPath() != null) {
- FSImageSerialization.writeString(
- directive.getPath().toUri().getPath(), out);
- }
- if (directive.getReplication() != null) {
- FSImageSerialization.writeShort(directive.getReplication(), out);
- }
- if (directive.getPool() != null) {
- FSImageSerialization.writeString(directive.getPool(), out);
- }
- if (directive.getExpiration() != null) {
- FSImageSerialization.writeLong(directive.getExpiration().getMillis(),
- out);
- }
+ FSImageSerialization.writeCacheDirectiveInfo(out, directive);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
- XMLUtils.addSaxString(contentHandler, "ID",
- Long.toString(directive.getId()));
- if (directive.getPath() != null) {
- XMLUtils.addSaxString(contentHandler, "PATH",
- directive.getPath().toUri().getPath());
- }
- if (directive.getReplication() != null) {
- XMLUtils.addSaxString(contentHandler, "REPLICATION",
- Short.toString(directive.getReplication()));
- }
- if (directive.getPool() != null) {
- XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
- }
- if (directive.getExpiration() != null) {
- XMLUtils.addSaxString(contentHandler, "EXPIRATION",
- "" + directive.getExpiration().getMillis());
- }
+ FSImageSerialization.writeCacheDirectiveInfo(contentHandler, directive);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
- CacheDirectiveInfo.Builder builder =
- new CacheDirectiveInfo.Builder();
- builder.setId(Long.parseLong(st.getValue("ID")));
- String path = st.getValueOrNull("PATH");
- if (path != null) {
- builder.setPath(new Path(path));
- }
- String replicationString = st.getValueOrNull("REPLICATION");
- if (replicationString != null) {
- builder.setReplication(Short.parseShort(replicationString));
- }
- String pool = st.getValueOrNull("POOL");
- if (pool != null) {
- builder.setPool(pool);
- }
- String expiryTime = st.getValueOrNull("EXPIRATION");
- if (expiryTime != null) {
- builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
- Long.parseLong(expiryTime)));
- }
- this.directive = builder.build();
+ this.directive = FSImageSerialization.readCacheDirectiveInfo(st);
readRpcIdsFromXml(st);
}
@@ -3184,30 +3072,35 @@ static AddCachePoolOp getInstance(OpInstanceCache cache) {
public AddCachePoolOp setPool(CachePoolInfo info) {
this.info = info;
+ assert(info.getPoolName() != null);
+ assert(info.getOwnerName() != null);
+ assert(info.getGroupName() != null);
+ assert(info.getMode() != null);
+ assert(info.getLimit() != null);
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
- info = CachePoolInfo.readFrom(in);
+ info = FSImageSerialization.readCachePoolInfo(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
- info.writeTo(out);
+ FSImageSerialization.writeCachePoolInfo(out, info);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
- info.writeXmlTo(contentHandler);
+ FSImageSerialization.writeCachePoolInfo(contentHandler, info);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
- this.info = CachePoolInfo.readXmlFrom(st);
+ this.info = FSImageSerialization.readCachePoolInfo(st);
readRpcIdsFromXml(st);
}
@@ -3219,7 +3112,7 @@ public String toString() {
builder.append("ownerName=" + info.getOwnerName() + ",");
builder.append("groupName=" + info.getGroupName() + ",");
builder.append("mode=" + Short.toString(info.getMode().toShort()) + ",");
- builder.append("weight=" + Integer.toString(info.getWeight()));
+ builder.append("limit=" + Long.toString(info.getLimit()));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
@@ -3245,25 +3138,25 @@ public ModifyCachePoolOp setInfo(CachePoolInfo info) {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
- info = CachePoolInfo.readFrom(in);
+ info = FSImageSerialization.readCachePoolInfo(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
- info.writeTo(out);
+ FSImageSerialization.writeCachePoolInfo(out, info);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
- cachePoolInfoToXml(contentHandler, info);
+ FSImageSerialization.writeCachePoolInfo(contentHandler, info);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
- this.info = cachePoolInfoFromXml(st);
+ this.info = FSImageSerialization.readCachePoolInfo(st);
readRpcIdsFromXml(st);
}
@@ -3284,8 +3177,8 @@ public String toString() {
if (info.getMode() != null) {
fields.add("mode=" + info.getMode().toString());
}
- if (info.getWeight() != null) {
- fields.add("weight=" + info.getWeight());
+ if (info.getLimit() != null) {
+ fields.add("limit=" + info.getLimit());
}
builder.append(Joiner.on(",").join(fields));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
@@ -3757,41 +3650,4 @@ public static FsPermission fsPermissionFromXml(Stanza st)
short mode = Short.valueOf(st.getValue("MODE"));
return new FsPermission(mode);
}
-
- public static void cachePoolInfoToXml(ContentHandler contentHandler,
- CachePoolInfo info) throws SAXException {
- XMLUtils.addSaxString(contentHandler, "POOLNAME", info.getPoolName());
- if (info.getOwnerName() != null) {
- XMLUtils.addSaxString(contentHandler, "OWNERNAME", info.getOwnerName());
- }
- if (info.getGroupName() != null) {
- XMLUtils.addSaxString(contentHandler, "GROUPNAME", info.getGroupName());
- }
- if (info.getMode() != null) {
- fsPermissionToXml(contentHandler, info.getMode());
- }
- if (info.getWeight() != null) {
- XMLUtils.addSaxString(contentHandler, "WEIGHT",
- Integer.toString(info.getWeight()));
- }
- }
-
- public static CachePoolInfo cachePoolInfoFromXml(Stanza st)
- throws InvalidXmlException {
- String poolName = st.getValue("POOLNAME");
- CachePoolInfo info = new CachePoolInfo(poolName);
- if (st.hasChildren("OWNERNAME")) {
- info.setOwnerName(st.getValue("OWNERNAME"));
- }
- if (st.hasChildren("GROUPNAME")) {
- info.setGroupName(st.getValue("GROUPNAME"));
- }
- if (st.hasChildren("MODE")) {
- info.setMode(FSEditLogOp.fsPermissionFromXml(st));
- }
- if (st.hasChildren("WEIGHT")) {
- info.setWeight(Integer.parseInt(st.getValue("WEIGHT")));
- }
- return info;
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 2166b780d8..9d3fbcb6f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -30,6 +30,8 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@@ -38,11 +40,16 @@
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
+import org.apache.hadoop.hdfs.util.XMLUtils;
+import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
+import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
import com.google.common.base.Preconditions;
@@ -476,4 +483,202 @@ public static Block[] readCompactBlockArray(
}
return ret;
}
+
+ public static void writeCacheDirectiveInfo(DataOutputStream out,
+ CacheDirectiveInfo directive) throws IOException {
+ writeLong(directive.getId(), out);
+ int flags =
+ ((directive.getPath() != null) ? 0x1 : 0) |
+ ((directive.getReplication() != null) ? 0x2 : 0) |
+ ((directive.getPool() != null) ? 0x4 : 0) |
+ ((directive.getExpiration() != null) ? 0x8 : 0);
+ out.writeInt(flags);
+ if (directive.getPath() != null) {
+ writeString(directive.getPath().toUri().getPath(), out);
+ }
+ if (directive.getReplication() != null) {
+ writeShort(directive.getReplication(), out);
+ }
+ if (directive.getPool() != null) {
+ writeString(directive.getPool(), out);
+ }
+ if (directive.getExpiration() != null) {
+ writeLong(directive.getExpiration().getMillis(), out);
+ }
+ }
+
+ public static CacheDirectiveInfo readCacheDirectiveInfo(DataInput in)
+ throws IOException {
+ CacheDirectiveInfo.Builder builder =
+ new CacheDirectiveInfo.Builder();
+ builder.setId(readLong(in));
+ int flags = in.readInt();
+ if ((flags & 0x1) != 0) {
+ builder.setPath(new Path(readString(in)));
+ }
+ if ((flags & 0x2) != 0) {
+ builder.setReplication(readShort(in));
+ }
+ if ((flags & 0x4) != 0) {
+ builder.setPool(readString(in));
+ }
+ if ((flags & 0x8) != 0) {
+ builder.setExpiration(
+ CacheDirectiveInfo.Expiration.newAbsolute(readLong(in)));
+ }
+ if ((flags & ~0xF) != 0) {
+ throw new IOException("unknown flags set in " +
+ "ModifyCacheDirectiveInfoOp: " + flags);
+ }
+ return builder.build();
+ }
+
+ public static CacheDirectiveInfo readCacheDirectiveInfo(Stanza st)
+ throws InvalidXmlException {
+ CacheDirectiveInfo.Builder builder =
+ new CacheDirectiveInfo.Builder();
+ builder.setId(Long.parseLong(st.getValue("ID")));
+ String path = st.getValueOrNull("PATH");
+ if (path != null) {
+ builder.setPath(new Path(path));
+ }
+ String replicationString = st.getValueOrNull("REPLICATION");
+ if (replicationString != null) {
+ builder.setReplication(Short.parseShort(replicationString));
+ }
+ String pool = st.getValueOrNull("POOL");
+ if (pool != null) {
+ builder.setPool(pool);
+ }
+ String expiryTime = st.getValueOrNull("EXPIRATION");
+ if (expiryTime != null) {
+ builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+ Long.parseLong(expiryTime)));
+ }
+ return builder.build();
+ }
+
+ public static void writeCacheDirectiveInfo(ContentHandler contentHandler,
+ CacheDirectiveInfo directive) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "ID",
+ Long.toString(directive.getId()));
+ if (directive.getPath() != null) {
+ XMLUtils.addSaxString(contentHandler, "PATH",
+ directive.getPath().toUri().getPath());
+ }
+ if (directive.getReplication() != null) {
+ XMLUtils.addSaxString(contentHandler, "REPLICATION",
+ Short.toString(directive.getReplication()));
+ }
+ if (directive.getPool() != null) {
+ XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
+ }
+ if (directive.getExpiration() != null) {
+ XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+ "" + directive.getExpiration().getMillis());
+ }
+ }
+
+ public static void writeCachePoolInfo(DataOutputStream out, CachePoolInfo info)
+ throws IOException {
+ writeString(info.getPoolName(), out);
+
+ final String ownerName = info.getOwnerName();
+ final String groupName = info.getGroupName();
+ final Long limit = info.getLimit();
+ final FsPermission mode = info.getMode();
+
+ boolean hasOwner, hasGroup, hasMode, hasLimit;
+ hasOwner = ownerName != null;
+ hasGroup = groupName != null;
+ hasMode = mode != null;
+ hasLimit = limit != null;
+
+ int flags =
+ (hasOwner ? 0x1 : 0) |
+ (hasGroup ? 0x2 : 0) |
+ (hasMode ? 0x4 : 0) |
+ (hasLimit ? 0x8 : 0);
+ writeInt(flags, out);
+
+ if (hasOwner) {
+ writeString(ownerName, out);
+ }
+ if (hasGroup) {
+ writeString(groupName, out);
+ }
+ if (hasMode) {
+ mode.write(out);
+ }
+ if (hasLimit) {
+ writeLong(limit, out);
+ }
+ }
+
+ public static CachePoolInfo readCachePoolInfo(DataInput in)
+ throws IOException {
+ String poolName = readString(in);
+ CachePoolInfo info = new CachePoolInfo(poolName);
+ int flags = readInt(in);
+ if ((flags & 0x1) != 0) {
+ info.setOwnerName(readString(in));
+ }
+ if ((flags & 0x2) != 0) {
+ info.setGroupName(readString(in));
+ }
+ if ((flags & 0x4) != 0) {
+ info.setMode(FsPermission.read(in));
+ }
+ if ((flags & 0x8) != 0) {
+ info.setLimit(readLong(in));
+ }
+ if ((flags & ~0xF) != 0) {
+ throw new IOException("Unknown flag in CachePoolInfo: " + flags);
+ }
+ return info;
+ }
+
+ public static void writeCachePoolInfo(ContentHandler contentHandler,
+ CachePoolInfo info) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "POOLNAME", info.getPoolName());
+
+ final String ownerName = info.getOwnerName();
+ final String groupName = info.getGroupName();
+ final Long limit = info.getLimit();
+ final FsPermission mode = info.getMode();
+
+ if (ownerName != null) {
+ XMLUtils.addSaxString(contentHandler, "OWNERNAME", ownerName);
+ }
+ if (groupName != null) {
+ XMLUtils.addSaxString(contentHandler, "GROUPNAME", groupName);
+ }
+ if (mode != null) {
+ FSEditLogOp.fsPermissionToXml(contentHandler, mode);
+ }
+ if (limit != null) {
+ XMLUtils.addSaxString(contentHandler, "LIMIT",
+ Long.toString(limit));
+ }
+ }
+
+ public static CachePoolInfo readCachePoolInfo(Stanza st)
+ throws InvalidXmlException {
+ String poolName = st.getValue("POOLNAME");
+ CachePoolInfo info = new CachePoolInfo(poolName);
+ if (st.hasChildren("OWNERNAME")) {
+ info.setOwnerName(st.getValue("OWNERNAME"));
+ }
+ if (st.hasChildren("GROUPNAME")) {
+ info.setGroupName(st.getValue("GROUPNAME"));
+ }
+ if (st.hasChildren("MODE")) {
+ info.setMode(FSEditLogOp.fsPermissionFromXml(st));
+ }
+ if (st.hasChildren("LIMIT")) {
+ info.setLimit(Long.parseLong(st.getValue("LIMIT")));
+ }
+ return info;
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index fd3d06b2da..d293006d97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -126,6 +126,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@@ -7052,8 +7053,8 @@ void removeSnapshottableDirs(List toRemove) {
}
}
- long addCacheDirective(
- CacheDirectiveInfo directive) throws IOException {
+ long addCacheDirective(CacheDirectiveInfo directive, EnumSet flags)
+ throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@@ -7076,7 +7077,7 @@ long addCacheDirective(
"for this operation.");
}
CacheDirectiveInfo effectiveDirective =
- cacheManager.addDirective(directive, pc);
+ cacheManager.addDirective(directive, pc, flags);
getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
cacheEntry != null);
result = effectiveDirective.getId();
@@ -7094,8 +7095,8 @@ long addCacheDirective(
return result;
}
- void modifyCacheDirective(
- CacheDirectiveInfo directive) throws IOException {
+ void modifyCacheDirective(CacheDirectiveInfo directive,
+ EnumSet flags) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@@ -7111,7 +7112,7 @@ void modifyCacheDirective(
throw new SafeModeException(
"Cannot add cache directive", safeMode);
}
- cacheManager.modifyDirective(directive, pc);
+ cacheManager.modifyDirective(directive, pc, flags);
getEditLog().logModifyCacheDirectiveInfo(directive,
cacheEntry != null);
success = true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 84360e5eb4..af7262605a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -29,6 +29,7 @@
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -36,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -1239,14 +1241,14 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
@Override
public long addCacheDirective(
- CacheDirectiveInfo path) throws IOException {
- return namesystem.addCacheDirective(path);
+ CacheDirectiveInfo path, EnumSet flags) throws IOException {
+ return namesystem.addCacheDirective(path, flags);
}
@Override
public void modifyCacheDirective(
- CacheDirectiveInfo directive) throws IOException {
- namesystem.modifyCacheDirective(directive);
+ CacheDirectiveInfo directive, EnumSet flags) throws IOException {
+ namesystem.modifyCacheDirective(directive, flags);
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
index c6dc09360b..0e34db3c0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.tools;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
@@ -25,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@@ -135,6 +137,7 @@ public String getName() {
public String getShortUsage() {
return "[" + getName() +
" -path -pool " +
+ "[-force] " +
"[-replication ] [-ttl ]]\n";
}
@@ -146,6 +149,8 @@ public String getLongUsage() {
listing.addRow("", "The pool to which the directive will be " +
"added. You must have write permission on the cache pool "
+ "in order to add new directives.");
+ listing.addRow("-force",
+ "Skips checking of cache pool resource limits.");
listing.addRow("", "The cache replication factor to use. " +
"Defaults to 1.");
listing.addRow("", "How long the directive is " +
@@ -174,7 +179,7 @@ public int run(Configuration conf, List args) throws IOException {
return 1;
}
builder.setPool(poolName);
-
+ boolean force = StringUtils.popOption("-force", args);
String replicationString =
StringUtils.popOptionWithArgument("-replication", args);
if (replicationString != null) {
@@ -201,8 +206,12 @@ public int run(Configuration conf, List args) throws IOException {
DistributedFileSystem dfs = getDFS(conf);
CacheDirectiveInfo directive = builder.build();
+ EnumSet flags = EnumSet.noneOf(CacheFlag.class);
+ if (force) {
+ flags.add(CacheFlag.FORCE);
+ }
try {
- long id = dfs.addCacheDirective(directive);
+ long id = dfs.addCacheDirective(directive, flags);
System.out.println("Added cache directive " + id);
} catch (IOException e) {
System.err.println(prettifyException(e));
@@ -282,7 +291,7 @@ public String getName() {
@Override
public String getShortUsage() {
return "[" + getName() +
- " -id [-path ] [-replication ] " +
+ " -id [-path ] [-force] [-replication ] " +
"[-pool ] [-ttl ]]\n";
}
@@ -292,6 +301,8 @@ public String getLongUsage() {
listing.addRow("", "The ID of the directive to modify (required)");
listing.addRow("", "A path to cache. The path can be " +
"a directory or a file. (optional)");
+ listing.addRow("-force",
+ "Skips checking of cache pool resource limits.");
listing.addRow("", "The cache replication factor to use. " +
"(optional)");
listing.addRow("", "The pool to which the directive will be " +
@@ -322,6 +333,7 @@ public int run(Configuration conf, List args) throws IOException {
builder.setPath(new Path(path));
modified = true;
}
+ boolean force = StringUtils.popOption("-force", args);
String replicationString =
StringUtils.popOptionWithArgument("-replication", args);
if (replicationString != null) {
@@ -357,8 +369,12 @@ public int run(Configuration conf, List args) throws IOException {
return 1;
}
DistributedFileSystem dfs = getDFS(conf);
+ EnumSet flags = EnumSet.noneOf(CacheFlag.class);
+ if (force) {
+ flags.add(CacheFlag.FORCE);
+ }
try {
- dfs.modifyCacheDirective(builder.build());
+ dfs.modifyCacheDirective(builder.build(), flags);
System.out.println("Modified cache directive " + idString);
} catch (IOException e) {
System.err.println(prettifyException(e));
@@ -536,7 +552,7 @@ public String getName() {
@Override
public String getShortUsage() {
return "[" + NAME + " [-owner ] " +
- "[-group ] [-mode ] [-weight ]]\n";
+ "[-group ] [-mode ] [-limit ]]\n";
}
@Override
@@ -551,11 +567,10 @@ public String getLongUsage() {
listing.addRow("", "UNIX-style permissions for the pool. " +
"Permissions are specified in octal, e.g. 0755. " +
"By default, this is set to " + String.format("0%03o",
- FsPermission.getCachePoolDefault().toShort()));
- listing.addRow("", "Weight of the pool. " +
- "This is a relative measure of the importance of the pool used " +
- "during cache resource management. By default, it is set to " +
- CachePool.DEFAULT_WEIGHT);
+ FsPermission.getCachePoolDefault().toShort()) + ".");
+ listing.addRow("", "The maximum number of bytes that can be " +
+ "cached by directives in this pool, in aggregate. By default, " +
+ "no limit is set.");
return getShortUsage() + "\n" +
"Add a new cache pool.\n\n" +
@@ -564,34 +579,32 @@ public String getLongUsage() {
@Override
public int run(Configuration conf, List args) throws IOException {
- String owner = StringUtils.popOptionWithArgument("-owner", args);
- if (owner == null) {
- owner = UserGroupInformation.getCurrentUser().getShortUserName();
- }
- String group = StringUtils.popOptionWithArgument("-group", args);
- if (group == null) {
- group = UserGroupInformation.getCurrentUser().getGroupNames()[0];
- }
- String modeString = StringUtils.popOptionWithArgument("-mode", args);
- int mode;
- if (modeString == null) {
- mode = FsPermission.getCachePoolDefault().toShort();
- } else {
- mode = Integer.parseInt(modeString, 8);
- }
- String weightString = StringUtils.popOptionWithArgument("-weight", args);
- int weight;
- if (weightString == null) {
- weight = CachePool.DEFAULT_WEIGHT;
- } else {
- weight = Integer.parseInt(weightString);
- }
String name = StringUtils.popFirstNonOption(args);
if (name == null) {
System.err.println("You must specify a name when creating a " +
"cache pool.");
return 1;
}
+ CachePoolInfo info = new CachePoolInfo(name);
+
+ String owner = StringUtils.popOptionWithArgument("-owner", args);
+ if (owner != null) {
+ info.setOwnerName(owner);
+ }
+ String group = StringUtils.popOptionWithArgument("-group", args);
+ if (group != null) {
+ info.setGroupName(group);
+ }
+ String modeString = StringUtils.popOptionWithArgument("-mode", args);
+ if (modeString != null) {
+ short mode = Short.parseShort(modeString, 8);
+ info.setMode(new FsPermission(mode));
+ }
+ String limitString = StringUtils.popOptionWithArgument("-limit", args);
+ if (limitString != null) {
+ long limit = Long.parseLong(limitString);
+ info.setLimit(limit);
+ }
if (!args.isEmpty()) {
System.err.print("Can't understand arguments: " +
Joiner.on(" ").join(args) + "\n");
@@ -599,11 +612,6 @@ public int run(Configuration conf, List args) throws IOException {
return 1;
}
DistributedFileSystem dfs = getDFS(conf);
- CachePoolInfo info = new CachePoolInfo(name).
- setOwnerName(owner).
- setGroupName(group).
- setMode(new FsPermission((short)mode)).
- setWeight(weight);
try {
dfs.addCachePool(info);
} catch (IOException e) {
@@ -624,7 +632,7 @@ public String getName() {
@Override
public String getShortUsage() {
return "[" + getName() + " [-owner ] " +
- "[-group ] [-mode ] [-weight ]]\n";
+ "[-group ] [-mode ] [-limit ]]\n";
}
@Override
@@ -635,11 +643,12 @@ public String getLongUsage() {
listing.addRow("", "Username of the owner of the pool");
listing.addRow("", "Groupname of the group of the pool.");
listing.addRow("", "Unix-style permissions of the pool in octal.");
- listing.addRow("", "Weight of the pool.");
+ listing.addRow("", "Maximum number of bytes that can be cached " +
+ "by this pool.");
return getShortUsage() + "\n" +
WordUtils.wrap("Modifies the metadata of an existing cache pool. " +
- "See usage of " + AddCachePoolCommand.NAME + " for more details",
+ "See usage of " + AddCachePoolCommand.NAME + " for more details.",
MAX_LINE_WIDTH) + "\n\n" +
listing.toString();
}
@@ -651,9 +660,9 @@ public int run(Configuration conf, List args) throws IOException {
String modeString = StringUtils.popOptionWithArgument("-mode", args);
Integer mode = (modeString == null) ?
null : Integer.parseInt(modeString, 8);
- String weightString = StringUtils.popOptionWithArgument("-weight", args);
- Integer weight = (weightString == null) ?
- null : Integer.parseInt(weightString);
+ String limitString = StringUtils.popOptionWithArgument("-limit", args);
+ Long limit = (limitString == null) ?
+ null : Long.parseLong(limitString);
String name = StringUtils.popFirstNonOption(args);
if (name == null) {
System.err.println("You must specify a name when creating a " +
@@ -680,8 +689,8 @@ public int run(Configuration conf, List args) throws IOException {
info.setMode(new FsPermission(mode.shortValue()));
changed = true;
}
- if (weight != null) {
- info.setWeight(weight);
+ if (limit != null) {
+ info.setLimit(limit);
changed = true;
}
if (!changed) {
@@ -709,8 +718,8 @@ public int run(Configuration conf, List args) throws IOException {
System.out.print(prefix + "mode " + new FsPermission(mode.shortValue()));
prefix = " and ";
}
- if (weight != null) {
- System.out.print(prefix + "weight " + weight);
+ if (limit != null) {
+ System.out.print(prefix + "limit " + limit);
prefix = " and ";
}
System.out.print("\n");
@@ -804,11 +813,12 @@ public int run(Configuration conf, List args) throws IOException {
addField("OWNER", Justification.LEFT).
addField("GROUP", Justification.LEFT).
addField("MODE", Justification.LEFT).
- addField("WEIGHT", Justification.RIGHT);
+ addField("LIMIT", Justification.RIGHT);
if (printStats) {
builder.
addField("BYTES_NEEDED", Justification.RIGHT).
addField("BYTES_CACHED", Justification.RIGHT).
+ addField("BYTES_OVERLIMIT", Justification.RIGHT).
addField("FILES_NEEDED", Justification.RIGHT).
addField("FILES_CACHED", Justification.RIGHT);
}
@@ -825,12 +835,19 @@ public int run(Configuration conf, List args) throws IOException {
row.add(info.getOwnerName());
row.add(info.getGroupName());
row.add(info.getMode() != null ? info.getMode().toString() : null);
- row.add(
- info.getWeight() != null ? info.getWeight().toString() : null);
+ Long limit = info.getLimit();
+ String limitString;
+ if (limit != null && limit.equals(CachePool.DEFAULT_LIMIT)) {
+ limitString = "unlimited";
+ } else {
+ limitString = "" + limit;
+ }
+ row.add(limitString);
if (printStats) {
CachePoolStats stats = entry.getStats();
row.add(Long.toString(stats.getBytesNeeded()));
row.add(Long.toString(stats.getBytesCached()));
+ row.add(Long.toString(stats.getBytesOverlimit()));
row.add(Long.toString(stats.getFilesNeeded()));
row.add(Long.toString(stats.getFilesCached()));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 11eddc326f..ee1d10415b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -385,8 +385,13 @@ message CacheDirectiveStatsProto {
required bool hasExpired = 5;
}
+enum CacheFlagProto {
+ FORCE = 0x01; // Ignore pool resource limits
+}
+
message AddCacheDirectiveRequestProto {
required CacheDirectiveInfoProto info = 1;
+ optional uint32 cacheFlags = 2; // bits set using CacheFlag
}
message AddCacheDirectiveResponseProto {
@@ -395,6 +400,7 @@ message AddCacheDirectiveResponseProto {
message ModifyCacheDirectiveRequestProto {
required CacheDirectiveInfoProto info = 1;
+ optional uint32 cacheFlags = 2; // bits set using CacheFlag
}
message ModifyCacheDirectiveResponseProto {
@@ -427,14 +433,15 @@ message CachePoolInfoProto {
optional string ownerName = 2;
optional string groupName = 3;
optional int32 mode = 4;
- optional int32 weight = 5;
+ optional int64 limit = 5;
}
message CachePoolStatsProto {
required int64 bytesNeeded = 1;
required int64 bytesCached = 2;
- required int64 filesNeeded = 3;
- required int64 filesCached = 4;
+ required int64 bytesOverlimit = 3;
+ required int64 filesNeeded = 4;
+ required int64 filesCached = 5;
}
message AddCachePoolRequestProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index ad452f9733..74152e2779 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1036,20 +1036,20 @@ public static void runOperations(MiniDFSCluster cluster,
// OP_ADD_CACHE_POOL
filesystem.addCachePool(new CachePoolInfo("pool1"));
// OP_MODIFY_CACHE_POOL
- filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
+ filesystem.modifyCachePool(new CachePoolInfo("pool1").setLimit(99l));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE
long id = filesystem.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/path")).
setReplication((short)1).
setPool("pool1").
- build());
+ build(), EnumSet.of(CacheFlag.FORCE));
// OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE
filesystem.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)2).
- build());
+ build(), EnumSet.of(CacheFlag.FORCE));
// OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE
filesystem.removeCacheDirective(id);
// OP_REMOVE_CACHE_POOL
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
index 9a3572b5ba..43a4af1fe8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
@@ -239,7 +239,7 @@ public Object run() throws IOException, InterruptedException {
.setOwnerName("carlton")
.setGroupName("party")
.setMode(new FsPermission((short)0700))
- .setWeight(1989));
+ .setLimit(1989l));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
index eb5f7a0d57..6dbbb8363e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
@@ -34,6 +34,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -43,6 +44,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
@@ -92,25 +94,48 @@ public class TestCacheDirectives {
static private MiniDFSCluster cluster;
static private DistributedFileSystem dfs;
static private NamenodeProtocols proto;
+ static private NameNode namenode;
static private CacheManipulator prevCacheManipulator;
static {
EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
}
- @Before
- public void setup() throws Exception {
- conf = new HdfsConfiguration();
+ private static final long BLOCK_SIZE = 512;
+ private static final int NUM_DATANODES = 4;
+ // Most Linux installs will allow non-root users to lock 64KB.
+ // In this test though, we stub out mlock so this doesn't matter.
+ private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
+
+ private static HdfsConfiguration createCachingConf() {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+ conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+ conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
// set low limits here for testing purposes
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
+ 2);
+
+ return conf;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = createCachingConf();
+ cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
proto = cluster.getNameNodeRpc();
+ namenode = cluster.getNameNode();
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
- LogManager.getLogger(CacheReplicationMonitor.class).setLevel(Level.TRACE);
+ LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
+ Level.TRACE);
}
@After
@@ -127,7 +152,7 @@ public void testBasicPoolOperations() throws Exception {
final String poolName = "pool1";
CachePoolInfo info = new CachePoolInfo(poolName).
setOwnerName("bob").setGroupName("bobgroup").
- setMode(new FsPermission((short)0755)).setWeight(150);
+ setMode(new FsPermission((short)0755)).setLimit(150l);
// Add a pool
dfs.addCachePool(info);
@@ -168,7 +193,7 @@ public void testBasicPoolOperations() throws Exception {
// Modify the pool
info.setOwnerName("jane").setGroupName("janegroup")
- .setMode(new FsPermission((short)0700)).setWeight(314);
+ .setMode(new FsPermission((short)0700)).setLimit(314l);
dfs.modifyCachePool(info);
// Do some invalid modify pools
@@ -263,10 +288,10 @@ public void testCreateAndModifyPools() throws Exception {
String ownerName = "abc";
String groupName = "123";
FsPermission mode = new FsPermission((short)0755);
- int weight = 150;
+ long limit = 150;
dfs.addCachePool(new CachePoolInfo(poolName).
setOwnerName(ownerName).setGroupName(groupName).
- setMode(mode).setWeight(weight));
+ setMode(mode).setLimit(limit));
RemoteIterator iter = dfs.listCachePools();
CachePoolInfo info = iter.next().getInfo();
@@ -277,10 +302,10 @@ public void testCreateAndModifyPools() throws Exception {
ownerName = "def";
groupName = "456";
mode = new FsPermission((short)0700);
- weight = 151;
+ limit = 151;
dfs.modifyCachePool(new CachePoolInfo(poolName).
setOwnerName(ownerName).setGroupName(groupName).
- setMode(mode).setWeight(weight));
+ setMode(mode).setLimit(limit));
iter = dfs.listCachePools();
info = iter.next().getInfo();
@@ -288,7 +313,7 @@ public void testCreateAndModifyPools() throws Exception {
assertEquals(ownerName, info.getOwnerName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
- assertEquals(Integer.valueOf(weight), info.getWeight());
+ assertEquals(limit, (long)info.getLimit());
dfs.removeCachePool(poolName);
iter = dfs.listCachePools();
@@ -495,30 +520,22 @@ public void testAddRemoveDirectives() throws Exception {
@Test(timeout=60000)
public void testCacheManagerRestart() throws Exception {
- cluster.shutdown();
- cluster = null;
- HdfsConfiguration conf = createCachingConf();
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
-
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
-
// Create and validate a pool
final String pool = "poolparty";
String groupName = "partygroup";
FsPermission mode = new FsPermission((short)0777);
- int weight = 747;
+ long limit = 747;
dfs.addCachePool(new CachePoolInfo(pool)
.setGroupName(groupName)
.setMode(mode)
- .setWeight(weight));
+ .setLimit(limit));
RemoteIterator pit = dfs.listCachePools();
assertTrue("No cache pools found", pit.hasNext());
CachePoolInfo info = pit.next().getInfo();
assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
- assertEquals(weight, (int)info.getWeight());
+ assertEquals(limit, (long)info.getLimit());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
// Create some cache entries
@@ -556,7 +573,7 @@ public void testCacheManagerRestart() throws Exception {
assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
- assertEquals(weight, (int)info.getWeight());
+ assertEquals(limit, (long)info.getLimit());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
dit = dfs.listCacheDirectives(null);
@@ -762,91 +779,64 @@ private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
numCachedReplicas);
}
- private static final long BLOCK_SIZE = 512;
- private static final int NUM_DATANODES = 4;
-
- // Most Linux installs will allow non-root users to lock 64KB.
- private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
-
- private static HdfsConfiguration createCachingConf() {
- HdfsConfiguration conf = new HdfsConfiguration();
- conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
- conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
- conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
- conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
- conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
- conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
- return conf;
- }
-
@Test(timeout=120000)
public void testWaitForCachedReplicas() throws Exception {
- HdfsConfiguration conf = createCachingConf();
FileSystemTestHelper helper = new FileSystemTestHelper();
- MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+ GenericTestUtils.waitFor(new Supplier() {
+ @Override
+ public Boolean get() {
+ return ((namenode.getNamesystem().getCacheCapacity() ==
+ (NUM_DATANODES * CACHE_CAPACITY)) &&
+ (namenode.getNamesystem().getCacheUsed() == 0));
+ }
+ }, 500, 60000);
- try {
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- final NameNode namenode = cluster.getNameNode();
- GenericTestUtils.waitFor(new Supplier() {
- @Override
- public Boolean get() {
- return ((namenode.getNamesystem().getCacheCapacity() ==
- (NUM_DATANODES * CACHE_CAPACITY)) &&
- (namenode.getNamesystem().getCacheUsed() == 0));
- }
- }, 500, 60000);
-
- NamenodeProtocols nnRpc = namenode.getRpcServer();
- Path rootDir = helper.getDefaultWorkingDirectory(dfs);
- // Create the pool
- final String pool = "friendlyPool";
- nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
- // Create some test files
- final int numFiles = 2;
- final int numBlocksPerFile = 2;
- final List paths = new ArrayList(numFiles);
- for (int i=0; i entries =
- new CacheDirectiveIterator(nnRpc, null);
- for (int i=0; i paths = new ArrayList(numFiles);
+ for (int i=0; i entries =
+ new CacheDirectiveIterator(nnRpc, null);
+ for (int i=0; i paths = new LinkedList();
- paths.add(new Path("/foo/bar"));
- paths.add(new Path("/foo/baz"));
- paths.add(new Path("/foo2/bar2"));
- paths.add(new Path("/foo2/baz2"));
- dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
- dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
- final int numBlocksPerFile = 2;
- for (Path path : paths) {
- FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
- (int)BLOCK_SIZE, (short)3, false);
- }
- waitForCachedBlocks(namenode, 0, 0,
- "testWaitForCachedReplicasInDirectory:0");
-
- // cache entire directory
- long id = dfs.addCacheDirective(
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo")).
- setReplication((short)2).
- setPool(pool).
- build());
- waitForCachedBlocks(namenode, 4, 8,
- "testWaitForCachedReplicasInDirectory:1:blocks");
- // Verify that listDirectives gives the stats we want.
- waitForCacheDirectiveStats(dfs,
- 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
- 2, 2,
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo")).
- build(),
- "testWaitForCachedReplicasInDirectory:1:directive");
- waitForCachePoolStats(dfs,
- 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
- 2, 2,
- poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
-
- long id2 = dfs.addCacheDirective(
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo/bar")).
- setReplication((short)4).
- setPool(pool).
- build());
- // wait for an additional 2 cached replicas to come up
- waitForCachedBlocks(namenode, 4, 10,
- "testWaitForCachedReplicasInDirectory:2:blocks");
- // the directory directive's stats are unchanged
- waitForCacheDirectiveStats(dfs,
- 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
- 2, 2,
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo")).
- build(),
- "testWaitForCachedReplicasInDirectory:2:directive-1");
- // verify /foo/bar's stats
- waitForCacheDirectiveStats(dfs,
- 4 * numBlocksPerFile * BLOCK_SIZE,
- // only 3 because the file only has 3 replicas, not 4 as requested.
- 3 * numBlocksPerFile * BLOCK_SIZE,
- 1,
- // only 0 because the file can't be fully cached
- 0,
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo/bar")).
- build(),
- "testWaitForCachedReplicasInDirectory:2:directive-2");
- waitForCachePoolStats(dfs,
- (4+4) * numBlocksPerFile * BLOCK_SIZE,
- (4+3) * numBlocksPerFile * BLOCK_SIZE,
- 3, 2,
- poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
-
- // remove and watch numCached go to 0
- dfs.removeCacheDirective(id);
- dfs.removeCacheDirective(id2);
- waitForCachedBlocks(namenode, 0, 0,
- "testWaitForCachedReplicasInDirectory:3:blocks");
- waitForCachePoolStats(dfs,
- 0, 0,
- 0, 0,
- poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
- } finally {
- cluster.shutdown();
+ // Create the pool
+ final String pool = "friendlyPool";
+ final CachePoolInfo poolInfo = new CachePoolInfo(pool);
+ dfs.addCachePool(poolInfo);
+ // Create some test files
+ final List paths = new LinkedList();
+ paths.add(new Path("/foo/bar"));
+ paths.add(new Path("/foo/baz"));
+ paths.add(new Path("/foo2/bar2"));
+ paths.add(new Path("/foo2/baz2"));
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+ final int numBlocksPerFile = 2;
+ for (Path path : paths) {
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+ (int)BLOCK_SIZE, (short)3, false);
}
+ waitForCachedBlocks(namenode, 0, 0,
+ "testWaitForCachedReplicasInDirectory:0");
+
+ // cache entire directory
+ long id = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ setReplication((short)2).
+ setPool(pool).
+ build());
+ waitForCachedBlocks(namenode, 4, 8,
+ "testWaitForCachedReplicasInDirectory:1:blocks");
+ // Verify that listDirectives gives the stats we want.
+ waitForCacheDirectiveStats(dfs,
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+ 2, 2,
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ build(),
+ "testWaitForCachedReplicasInDirectory:1:directive");
+ waitForCachePoolStats(dfs,
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+ 2, 2,
+ poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
+
+ long id2 = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo/bar")).
+ setReplication((short)4).
+ setPool(pool).
+ build());
+ // wait for an additional 2 cached replicas to come up
+ waitForCachedBlocks(namenode, 4, 10,
+ "testWaitForCachedReplicasInDirectory:2:blocks");
+ // the directory directive's stats are unchanged
+ waitForCacheDirectiveStats(dfs,
+ 4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
+ 2, 2,
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ build(),
+ "testWaitForCachedReplicasInDirectory:2:directive-1");
+ // verify /foo/bar's stats
+ waitForCacheDirectiveStats(dfs,
+ 4 * numBlocksPerFile * BLOCK_SIZE,
+ // only 3 because the file only has 3 replicas, not 4 as requested.
+ 3 * numBlocksPerFile * BLOCK_SIZE,
+ 1,
+ // only 0 because the file can't be fully cached
+ 0,
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo/bar")).
+ build(),
+ "testWaitForCachedReplicasInDirectory:2:directive-2");
+ waitForCachePoolStats(dfs,
+ (4+4) * numBlocksPerFile * BLOCK_SIZE,
+ (4+3) * numBlocksPerFile * BLOCK_SIZE,
+ 3, 2,
+ poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
+
+ // remove and watch numCached go to 0
+ dfs.removeCacheDirective(id);
+ dfs.removeCacheDirective(id2);
+ waitForCachedBlocks(namenode, 0, 0,
+ "testWaitForCachedReplicasInDirectory:3:blocks");
+ waitForCachePoolStats(dfs,
+ 0, 0,
+ 0, 0,
+ poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
}
/**
@@ -1000,68 +979,57 @@ public void testWaitForCachedReplicasInDirectory() throws Exception {
*/
@Test(timeout=120000)
public void testReplicationFactor() throws Exception {
- HdfsConfiguration conf = createCachingConf();
- MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
- try {
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- NameNode namenode = cluster.getNameNode();
- // Create the pool
- final String pool = "friendlyPool";
- dfs.addCachePool(new CachePoolInfo(pool));
- // Create some test files
- final List paths = new LinkedList();
- paths.add(new Path("/foo/bar"));
- paths.add(new Path("/foo/baz"));
- paths.add(new Path("/foo2/bar2"));
- paths.add(new Path("/foo2/baz2"));
- dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
- dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
- final int numBlocksPerFile = 2;
- for (Path path : paths) {
- FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
- (int)BLOCK_SIZE, (short)3, false);
- }
- waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
- checkNumCachedReplicas(dfs, paths, 0, 0);
- // cache directory
- long id = dfs.addCacheDirective(
- new CacheDirectiveInfo.Builder().
- setPath(new Path("/foo")).
- setReplication((short)1).
- setPool(pool).
- build());
- waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
- checkNumCachedReplicas(dfs, paths, 4, 4);
- // step up the replication factor
- for (int i=2; i<=3; i++) {
- dfs.modifyCacheDirective(
- new CacheDirectiveInfo.Builder().
- setId(id).
- setReplication((short)i).
- build());
- waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
- checkNumCachedReplicas(dfs, paths, 4, 4*i);
- }
- // step it down
- for (int i=2; i>=1; i--) {
- dfs.modifyCacheDirective(
- new CacheDirectiveInfo.Builder().
- setId(id).
- setReplication((short)i).
- build());
- waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
- checkNumCachedReplicas(dfs, paths, 4, 4*i);
- }
- // remove and watch numCached go to 0
- dfs.removeCacheDirective(id);
- waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
- checkNumCachedReplicas(dfs, paths, 0, 0);
- } finally {
- cluster.shutdown();
+ // Create the pool
+ final String pool = "friendlyPool";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ // Create some test files
+ final List paths = new LinkedList();
+ paths.add(new Path("/foo/bar"));
+ paths.add(new Path("/foo/baz"));
+ paths.add(new Path("/foo2/bar2"));
+ paths.add(new Path("/foo2/baz2"));
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+ final int numBlocksPerFile = 2;
+ for (Path path : paths) {
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+ (int)BLOCK_SIZE, (short)3, false);
}
+ waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
+ checkNumCachedReplicas(dfs, paths, 0, 0);
+ // cache directory
+ long id = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ setReplication((short)1).
+ setPool(pool).
+ build());
+ waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
+ checkNumCachedReplicas(dfs, paths, 4, 4);
+ // step up the replication factor
+ for (int i=2; i<=3; i++) {
+ dfs.modifyCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setId(id).
+ setReplication((short)i).
+ build());
+ waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
+ checkNumCachedReplicas(dfs, paths, 4, 4*i);
+ }
+ // step it down
+ for (int i=2; i>=1; i--) {
+ dfs.modifyCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setId(id).
+ setReplication((short)i).
+ build());
+ waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
+ checkNumCachedReplicas(dfs, paths, 4, 4*i);
+ }
+ // remove and watch numCached go to 0
+ dfs.removeCacheDirective(id);
+ waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
+ checkNumCachedReplicas(dfs, paths, 0, 0);
}
@Test(timeout=60000)
@@ -1081,11 +1049,12 @@ public void testListCachePoolPermissions() throws Exception {
assertNull("Unexpected owner name", info.getOwnerName());
assertNull("Unexpected group name", info.getGroupName());
assertNull("Unexpected mode", info.getMode());
- assertNull("Unexpected weight", info.getWeight());
+ assertNull("Unexpected limit", info.getLimit());
// Modify the pool so myuser is now the owner
+ final long limit = 99;
dfs.modifyCachePool(new CachePoolInfo(poolName)
.setOwnerName(myUser.getShortUserName())
- .setWeight(99));
+ .setLimit(limit));
// Should see full info
it = myDfs.listCachePools();
info = it.next().getInfo();
@@ -1096,60 +1065,127 @@ public void testListCachePoolPermissions() throws Exception {
assertNotNull("Expected group name", info.getGroupName());
assertEquals("Mismatched mode", (short) 0700,
info.getMode().toShort());
- assertEquals("Mismatched weight", 99, (int)info.getWeight());
+ assertEquals("Mismatched limit", limit, (long)info.getLimit());
}
- @Test(timeout=60000)
+ @Test(timeout=120000)
public void testExpiry() throws Exception {
- HdfsConfiguration conf = createCachingConf();
- MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+ String pool = "pool1";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ Path p = new Path("/mypath");
+ DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
+ // Expire after test timeout
+ Date start = new Date();
+ Date expiry = DateUtils.addSeconds(start, 120);
+ final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+ .setPath(p)
+ .setPool(pool)
+ .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
+ .setReplication((short)2)
+ .build());
+ waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
+ // Change it to expire sooner
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+ .setExpiration(Expiration.newRelative(0)).build());
+ waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
+ RemoteIterator it = dfs.listCacheDirectives(null);
+ CacheDirectiveEntry ent = it.next();
+ assertFalse(it.hasNext());
+ Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+ assertTrue("Directive should have expired",
+ entryExpiry.before(new Date()));
+ // Change it back to expire later
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+ .setExpiration(Expiration.newRelative(120000)).build());
+ waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
+ it = dfs.listCacheDirectives(null);
+ ent = it.next();
+ assertFalse(it.hasNext());
+ entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+ assertTrue("Directive should not have expired",
+ entryExpiry.after(new Date()));
+ // Verify that setting a negative TTL throws an error
try {
- DistributedFileSystem dfs = cluster.getFileSystem();
- String pool = "pool1";
- dfs.addCachePool(new CachePoolInfo(pool));
- Path p = new Path("/mypath");
- DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
- // Expire after test timeout
- Date start = new Date();
- Date expiry = DateUtils.addSeconds(start, 120);
- final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
- .setPath(p)
- .setPool(pool)
- .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
- .setReplication((short)2)
- .build());
- waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
- // Change it to expire sooner
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
- .setExpiration(Expiration.newRelative(0)).build());
- waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
- RemoteIterator it = dfs.listCacheDirectives(null);
- CacheDirectiveEntry ent = it.next();
- assertFalse(it.hasNext());
- Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
- assertTrue("Directive should have expired",
- entryExpiry.before(new Date()));
- // Change it back to expire later
- dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
- .setExpiration(Expiration.newRelative(120000)).build());
- waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
- it = dfs.listCacheDirectives(null);
- ent = it.next();
- assertFalse(it.hasNext());
- entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
- assertTrue("Directive should not have expired",
- entryExpiry.after(new Date()));
- // Verify that setting a negative TTL throws an error
- try {
- dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
- .setExpiration(Expiration.newRelative(-1)).build());
- } catch (InvalidRequestException e) {
- GenericTestUtils
- .assertExceptionContains("Cannot set a negative expiration", e);
- }
- } finally {
- cluster.shutdown();
+ .setExpiration(Expiration.newRelative(-1)).build());
+ } catch (InvalidRequestException e) {
+ GenericTestUtils
+ .assertExceptionContains("Cannot set a negative expiration", e);
}
}
+
+ @Test(timeout=120000)
+ public void testLimit() throws Exception {
+ try {
+ dfs.addCachePool(new CachePoolInfo("poolofnegativity").setLimit(-99l));
+ fail("Should not be able to set a negative limit");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("negative", e);
+ }
+ final String destiny = "poolofdestiny";
+ final Path path1 = new Path("/destiny");
+ DFSTestUtil.createFile(dfs, path1, 2*BLOCK_SIZE, (short)1, 0x9494);
+ // Start off with a limit that is too small
+ final CachePoolInfo poolInfo = new CachePoolInfo(destiny)
+ .setLimit(2*BLOCK_SIZE-1);
+ dfs.addCachePool(poolInfo);
+ final CacheDirectiveInfo info1 = new CacheDirectiveInfo.Builder()
+ .setPool(destiny).setPath(path1).build();
+ try {
+ dfs.addCacheDirective(info1);
+ fail("Should not be able to cache when there is no more limit");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("remaining capacity", e);
+ }
+ // Raise the limit up to fit and it should work this time
+ poolInfo.setLimit(2*BLOCK_SIZE);
+ dfs.modifyCachePool(poolInfo);
+ long id1 = dfs.addCacheDirective(info1);
+ waitForCachePoolStats(dfs,
+ 2*BLOCK_SIZE, 2*BLOCK_SIZE,
+ 1, 1,
+ poolInfo, "testLimit:1");
+ // Adding another file, it shouldn't be cached
+ final Path path2 = new Path("/failure");
+ DFSTestUtil.createFile(dfs, path2, BLOCK_SIZE, (short)1, 0x9495);
+ try {
+ dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+ .setPool(destiny).setPath(path2).build(),
+ EnumSet.noneOf(CacheFlag.class));
+ fail("Should not be able to add another cached file");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("remaining capacity", e);
+ }
+ // Bring the limit down, the first file should get uncached
+ poolInfo.setLimit(BLOCK_SIZE);
+ dfs.modifyCachePool(poolInfo);
+ waitForCachePoolStats(dfs,
+ 2*BLOCK_SIZE, 0,
+ 1, 0,
+ poolInfo, "testLimit:2");
+ RemoteIterator it = dfs.listCachePools();
+ assertTrue("Expected a cache pool", it.hasNext());
+ CachePoolStats stats = it.next().getStats();
+ assertEquals("Overlimit bytes should be difference of needed and limit",
+ BLOCK_SIZE, stats.getBytesOverlimit());
+ // Moving a directive to a pool without enough limit should fail
+ CachePoolInfo inadequate =
+ new CachePoolInfo("poolofinadequacy").setLimit(BLOCK_SIZE);
+ dfs.addCachePool(inadequate);
+ try {
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1)
+ .setId(id1).setPool(inadequate.getPoolName()).build(),
+ EnumSet.noneOf(CacheFlag.class));
+ } catch(InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("remaining capacity", e);
+ }
+ // Succeeds when force=true
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1).setId(id1)
+ .setPool(inadequate.getPoolName()).build(),
+ EnumSet.of(CacheFlag.FORCE));
+ // Also can add with force=true
+ dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().setPool(inadequate.getPoolName())
+ .setPath(path1).build(), EnumSet.of(CacheFlag.FORCE));
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index a477b7107e..2f36da11d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -38,6 +38,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -760,7 +761,7 @@ void prepare() throws Exception {
@Override
void invoke() throws Exception {
- result = client.addCacheDirective(directive);
+ result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
@@ -802,7 +803,7 @@ class ModifyCacheDirectiveInfoOp extends AtMostOnceOp {
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
- id = client.addCacheDirective(directive);
+ id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
@@ -811,7 +812,7 @@ void invoke() throws Exception {
new CacheDirectiveInfo.Builder().
setId(id).
setReplication(newReplication).
- build());
+ build(), EnumSet.of(CacheFlag.FORCE));
}
@Override
@@ -858,7 +859,7 @@ class RemoveCacheDirectiveInfoOp extends AtMostOnceOp {
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
- id = dfs.addCacheDirective(directive);
+ id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
@@ -936,19 +937,19 @@ class ModifyCachePoolOp extends AtMostOnceOp {
@Override
void prepare() throws Exception {
- client.addCachePool(new CachePoolInfo(pool).setWeight(10));
+ client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
}
@Override
void invoke() throws Exception {
- client.modifyCachePool(new CachePoolInfo(pool).setWeight(99));
+ client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator iter = dfs.listCachePools();
- if (iter.hasNext() && iter.next().getInfo().getWeight() == 99) {
+ if (iter.hasNext() && (long)iter.next().getInfo().getLimit() == 99) {
return true;
}
Thread.sleep(1000);
@@ -1216,7 +1217,7 @@ public void testListCacheDirectives() throws Exception {
CacheDirectiveInfo directiveInfo =
new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build();
dfs.addCachePool(new CachePoolInfo(poolName));
- dfs.addCacheDirective(directiveInfo);
+ dfs.addCacheDirective(directiveInfo, EnumSet.of(CacheFlag.FORCE));
poolNames.add(poolName);
}
listCacheDirectives(poolNames, 0);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
index 3c7fcbbf98..dc60e3424b 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
index 0adc5836f7..bd1224d3b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
@@ -13,8 +13,8 @@
2
1
- 1386695013416
- 360a10c6ecac725e
+ 1387701670577
+ 7bb5467995769b59
@@ -24,8 +24,8 @@
3
2
- 1386695013425
- 9b110c0b83225f7d
+ 1387701670580
+ a5a3a2755e36827b
@@ -37,17 +37,17 @@
16386
/file_create_u\0001;F431
1
- 1386003814612
- 1386003814612
+ 1387010471220
+ 1387010471220
512
- DFSClient_NONMAPREDUCE_-1253204429_1
+ DFSClient_NONMAPREDUCE_-52011019_1
127.0.0.1
- aagarwal
+ andrew
supergroup
420
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
7
@@ -59,13 +59,13 @@
0
/file_create_u\0001;F431
1
- 1386003814665
- 1386003814612
+ 1387010471276
+ 1387010471220
512
- aagarwal
+ andrew
supergroup
420
@@ -78,8 +78,8 @@
0
/file_create_u\0001;F431
/file_moved
- 1386003814671
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 1387010471286
+ 508263bb-692e-4439-8738-ff89b8b03923
9
@@ -89,8 +89,8 @@
7
0
/file_moved
- 1386003814678
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 1387010471299
+ 508263bb-692e-4439-8738-ff89b8b03923
10
@@ -101,9 +101,9 @@
0
16387
/directory_mkdir
- 1386003814686
+ 1387010471312
- aagarwal
+ andrew
supergroup
493
@@ -136,7 +136,7 @@
12
/directory_mkdir
snapshot1
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
15
@@ -147,7 +147,7 @@
/directory_mkdir
snapshot1
snapshot2
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
16
@@ -157,7 +157,7 @@
14
/directory_mkdir
snapshot2
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
17
@@ -169,17 +169,17 @@
16388
/file_create_u\0001;F431
1
- 1386003814712
- 1386003814712
+ 1387010471373
+ 1387010471373
512
- DFSClient_NONMAPREDUCE_-1253204429_1
+ DFSClient_NONMAPREDUCE_-52011019_1
127.0.0.1
- aagarwal
+ andrew
supergroup
420
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
18
@@ -191,13 +191,13 @@
0
/file_create_u\0001;F431
1
- 1386003814714
- 1386003814712
+ 1387010471380
+ 1387010471373
512
- aagarwal
+ andrew
supergroup
420
@@ -253,9 +253,9 @@
0
/file_create_u\0001;F431
/file_moved
- 1386003814732
+ 1387010471428
NONE
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
25
@@ -267,17 +267,17 @@
16389
/file_concat_target
1
- 1386003814737
- 1386003814737
+ 1387010471438
+ 1387010471438
512
- DFSClient_NONMAPREDUCE_-1253204429_1
+ DFSClient_NONMAPREDUCE_-52011019_1
127.0.0.1
- aagarwal
+ andrew
supergroup
420
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
27
@@ -388,8 +388,8 @@
0
/file_concat_target
1
- 1386003814889
- 1386003814737
+ 1387010471540
+ 1387010471438
512
@@ -409,7 +409,7 @@
1003
- aagarwal
+ andrew
supergroup
420
@@ -423,17 +423,17 @@
16390
/file_concat_0
1
- 1386003814891
- 1386003814891
+ 1387010471547
+ 1387010471547
512
- DFSClient_NONMAPREDUCE_-1253204429_1
+ DFSClient_NONMAPREDUCE_-52011019_1
127.0.0.1
- aagarwal
+ andrew
supergroup
420
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
40
@@ -544,8 +544,8 @@
0
/file_concat_0
1
- 1386003814914
- 1386003814891
+ 1387010471588
+ 1387010471547
512
@@ -565,7 +565,7 @@
1006
- aagarwal
+ andrew
supergroup
420
@@ -579,17 +579,17 @@
16391
/file_concat_1
1
- 1386003814916
- 1386003814916
+ 1387010471595
+ 1387010471595
512
- DFSClient_NONMAPREDUCE_-1253204429_1
+ DFSClient_NONMAPREDUCE_-52011019_1
127.0.0.1
- aagarwal
+ andrew
supergroup
420
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
52
@@ -700,8 +700,8 @@
0
/file_concat_1
1
- 1386003814938
- 1386003814916
+ 1387010471651
+ 1387010471595
512
@@ -721,7 +721,7 @@
1009
- aagarwal
+ andrew
supergroup
420
@@ -733,12 +733,12 @@
56
0
/file_concat_target
- 1386003814940
+ 1387010471663
/file_concat_0
/file_concat_1
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
63
@@ -750,14 +750,14 @@
16392
/file_symlink
/file_concat_target
- 1386003814956
- 1386003814956
+ 1387010471674
+ 1387010471674
- aagarwal
+ andrew
supergroup
511
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
64
@@ -768,14 +768,14 @@
HDFS_DELEGATION_TOKEN
1
- aagarwal
+ andrew
JobTracker
- 1386003814961
- 1386608614961
+ 1387010471682
+ 1387615271682
2
- 1386090214961
+ 1387096871682
@@ -785,14 +785,14 @@
HDFS_DELEGATION_TOKEN
1
- aagarwal
+ andrew
JobTracker
- 1386003814961
- 1386608614961
+ 1387010471682
+ 1387615271682
2
- 1386090215078
+ 1387096871717
@@ -802,11 +802,11 @@
HDFS_DELEGATION_TOKEN
1
- aagarwal
+ andrew
JobTracker
- 1386003814961
- 1386608614961
+ 1387010471682
+ 1387615271682
2
@@ -816,13 +816,11 @@
61
poolparty
-
- aagarwal
- staff
- 493
-
- 100
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ andrew
+ andrew
+ 493
+ 9223372036854775807
+ 508263bb-692e-4439-8738-ff89b8b03923
68
@@ -834,8 +832,8 @@
carlton
party
448
- 1989
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 1989
+ 508263bb-692e-4439-8738-ff89b8b03923
69
@@ -848,7 +846,7 @@
1
poolparty
-1
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
70
@@ -858,7 +856,7 @@
64
1
/bar2
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
71
@@ -867,7 +865,7 @@
65
1
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
72
@@ -876,7 +874,7 @@
66
poolparty
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
73
@@ -888,17 +886,17 @@
16393
/hard-lease-recovery-test
1
- 1386003815135
- 1386003815135
+ 1387010471802
+ 1387010471802
512
- DFSClient_NONMAPREDUCE_-1253204429_1
+ DFSClient_NONMAPREDUCE_-52011019_1
127.0.0.1
- aagarwal
+ andrew
supergroup
420
- f583267a-ef8c-4f3f-9014-b067b83945ad
+ 508263bb-692e-4439-8738-ff89b8b03923
74
@@ -955,7 +953,7 @@
OP_REASSIGN_LEASE
73
- DFSClient_NONMAPREDUCE_-1253204429_1
+ DFSClient_NONMAPREDUCE_-52011019_1
/hard-lease-recovery-test
HDFS_NameNode
@@ -968,8 +966,8 @@
0
/hard-lease-recovery-test
1
- 1386003817462
- 1386003815135
+ 1387010474126
+ 1387010471802
512
@@ -979,7 +977,7 @@
1011
- aagarwal
+ andrew
supergroup
420
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml
index c793bf964a..77f8671748 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml
@@ -80,8 +80,8 @@
Testing modifying a cache pool
- -addPool poolparty -owner alice -group alicegroup -mode 0000 -weight 50
- -modifyPool poolparty -owner bob -group bobgroup -mode 0777 -weight 51
+ -addPool poolparty -owner alice -group alicegroup -mode 0000 -limit 50
+ -modifyPool poolparty -owner bob -group bobgroup -mode 0777 -limit 51
-listPools
@@ -90,7 +90,7 @@
SubstringComparator
- poolparty bob bobgroup rwxrwxrwx 51
+ poolparty bob bobgroup rwxrwxrwx 51
@@ -129,11 +129,11 @@
SubstringComparator
- bar alice alicegroup rwxr-xr-x 100
+ bar alice alicegroup rwxr-xr-x unlimited
SubstringComparator
- foo bob bob rw-rw-r-- 100
+ foo bob bob rw-rw-r-- unlimited
@@ -156,7 +156,7 @@
SubstringComparator
- foo bob bob rw-rw-r-- 100
+ foo bob bob rw-rw-r-- unlimited
@@ -417,11 +417,11 @@
SubstringComparator
- bar alice alicegroup rwxr-xr-x 100 0 0 0 0
+ bar alice alicegroup rwxr-xr-x unlimited 0 0 0 0 0
SubstringComparator
- foo bob bob rw-rw-r-- 100 0 0 0 0
+ foo bob bob rw-rw-r-- unlimited 0 0 0 0 0