HDFS-15003. RBF: Make Router support storage type quota. Contributed by Jinglun.

This commit is contained in:
Ayush Saxena 2019-12-27 09:22:58 +05:30
parent 80f91d14ab
commit 8730a7bf60
13 changed files with 609 additions and 40 deletions

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSUtil.isParentEntry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -28,6 +29,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
@ -165,12 +168,16 @@ public class Quota {
long nQuota = HdfsConstants.QUOTA_RESET;
long sQuota = HdfsConstants.QUOTA_RESET;
long[] typeQuota = new long[StorageType.values().length];
eachByStorageType(t -> typeQuota[t.ordinal()] = HdfsConstants.QUOTA_RESET);
RouterQuotaManager manager = this.router.getQuotaManager();
TreeMap<String, RouterQuotaUsage> pts =
manager.getParentsContainingQuota(path);
Entry<String, RouterQuotaUsage> entry = pts.lastEntry();
while (entry != null && (nQuota == HdfsConstants.QUOTA_RESET
|| sQuota == HdfsConstants.QUOTA_RESET)) {
|| sQuota == HdfsConstants.QUOTA_RESET || orByStorageType(
t -> typeQuota[t.ordinal()] == HdfsConstants.QUOTA_RESET))) {
String ppath = entry.getKey();
QuotaUsage quota = entry.getValue();
if (nQuota == HdfsConstants.QUOTA_RESET) {
@ -179,9 +186,15 @@ public class Quota {
if (sQuota == HdfsConstants.QUOTA_RESET) {
sQuota = quota.getSpaceQuota();
}
eachByStorageType(t -> {
if (typeQuota[t.ordinal()] == HdfsConstants.QUOTA_RESET) {
typeQuota[t.ordinal()] = quota.getTypeQuota(t);
}
});
entry = pts.lowerEntry(ppath);
}
return new QuotaUsage.Builder().quota(nQuota).spaceQuota(sQuota).build();
return new QuotaUsage.Builder().quota(nQuota).spaceQuota(sQuota)
.typeQuota(typeQuota).build();
}
/**
@ -244,8 +257,11 @@ public class Quota {
Map<RemoteLocation, QuotaUsage> results) throws IOException {
long nsCount = 0;
long ssCount = 0;
long[] typeCount = new long[StorageType.values().length];
long nsQuota = HdfsConstants.QUOTA_RESET;
long ssQuota = HdfsConstants.QUOTA_RESET;
long[] typeQuota = new long[StorageType.values().length];
eachByStorageType(t -> typeQuota[t.ordinal()] = HdfsConstants.QUOTA_RESET);
boolean hasQuotaUnset = false;
boolean isMountEntry = isMountEntry(path);
@ -255,22 +271,27 @@ public class Quota {
if (isMountEntry) {
nsCount += usage.getFileAndDirectoryCount();
ssCount += usage.getSpaceConsumed();
eachByStorageType(
t -> typeCount[t.ordinal()] += usage.getTypeConsumed(t));
} else if (usage != null) {
// If quota is not set in real FileSystem, the usage
// value will return -1.
if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) {
if (!RouterQuotaManager.isQuotaSet(usage)) {
hasQuotaUnset = true;
}
nsQuota = usage.getQuota();
ssQuota = usage.getSpaceQuota();
eachByStorageType(t -> typeQuota[t.ordinal()] = usage.getTypeQuota(t));
nsCount += usage.getFileAndDirectoryCount();
ssCount += usage.getSpaceConsumed();
LOG.debug(
"Get quota usage for path: nsId: {}, dest: {},"
+ " nsCount: {}, ssCount: {}.",
eachByStorageType(
t -> typeCount[t.ordinal()] += usage.getTypeConsumed(t));
LOG.debug("Get quota usage for path: nsId: {}, dest: {},"
+ " nsCount: {}, ssCount: {}, typeCount: {}.",
loc.getNameserviceId(), loc.getDest(),
usage.getFileAndDirectoryCount(), usage.getSpaceConsumed());
usage.getFileAndDirectoryCount(), usage.getSpaceConsumed(),
usage.toString(false, true, Arrays.asList(StorageType.values())));
}
}
@ -278,19 +299,57 @@ public class Quota {
QuotaUsage quota = getGlobalQuota(path);
nsQuota = quota.getQuota();
ssQuota = quota.getSpaceQuota();
eachByStorageType(t -> typeQuota[t.ordinal()] = quota.getTypeQuota(t));
}
QuotaUsage.Builder builder = new QuotaUsage.Builder()
.fileAndDirectoryCount(nsCount).spaceConsumed(ssCount);
QuotaUsage.Builder builder =
new QuotaUsage.Builder().fileAndDirectoryCount(nsCount)
.spaceConsumed(ssCount).typeConsumed(typeCount);
if (hasQuotaUnset) {
builder.quota(HdfsConstants.QUOTA_RESET)
.spaceQuota(HdfsConstants.QUOTA_RESET);
eachByStorageType(t -> builder.typeQuota(t, HdfsConstants.QUOTA_RESET));
} else {
builder.quota(nsQuota).spaceQuota(ssQuota);
eachByStorageType(t -> builder.typeQuota(t, typeQuota[t.ordinal()]));
}
return builder.build();
}
/**
* Invoke consumer by each storage type.
* @param consumer the function consuming the storage type.
*/
public static void eachByStorageType(Consumer<StorageType> consumer) {
for (StorageType type : StorageType.values()) {
consumer.accept(type);
}
}
/**
* Invoke predicate by each storage type and bitwise inclusive OR the results.
* @param predicate the function test the storage type.
*/
public static boolean orByStorageType(Predicate<StorageType> predicate) {
boolean res = false;
for (StorageType type : StorageType.values()) {
res |= predicate.test(type);
}
return res;
}
/**
* Invoke predicate by each storage type and bitwise AND the results.
* @param predicate the function test the storage type.
*/
public static boolean andByStorageType(Predicate<StorageType> predicate) {
boolean res = false;
for (StorageType type : StorageType.values()) {
res &= predicate.test(type);
}
return res;
}
/**
* Get all quota remote locations across subclusters under given
* federation path.

View File

@ -32,6 +32,7 @@ import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -287,11 +288,24 @@ public class RouterAdminServer extends AbstractService
UpdateMountTableEntryResponse response = getMountTableStore()
.updateMountTableEntry(request);
try {
if (updateEntry != null && router.isQuotaEnabled()
&& isQuotaUpdated(request, oldEntry)) {
if (updateEntry != null && router.isQuotaEnabled()) {
// update quota.
if (isQuotaUpdated(request, oldEntry)) {
synchronizeQuota(updateEntry.getSourcePath(),
updateEntry.getQuota().getQuota(),
updateEntry.getQuota().getSpaceQuota());
updateEntry.getQuota().getSpaceQuota(), null);
}
// update storage type quota.
RouterQuotaUsage newQuota = request.getEntry().getQuota();
boolean locationsChanged = oldEntry == null ||
!oldEntry.getDestinations().equals(updateEntry.getDestinations());
for (StorageType t : StorageType.values()) {
if (locationsChanged || oldEntry.getQuota().getTypeQuota(t)
!= newQuota.getTypeQuota(t)) {
synchronizeQuota(updateEntry.getSourcePath(),
HdfsConstants.QUOTA_DONT_SET, newQuota.getTypeQuota(t), t);
}
}
}
} catch (Exception e) {
// Ignore exception, if any while reseting quota. Specifically to handle
@ -344,16 +358,17 @@ public class RouterAdminServer extends AbstractService
* @param path Source path in given mount table.
* @param nsQuota Name quota definition in given mount table.
* @param ssQuota Space quota definition in given mount table.
* @param type Storage type of quota. Null if it's not a storage type quota.
* @throws IOException
*/
private void synchronizeQuota(String path, long nsQuota, long ssQuota)
throws IOException {
private void synchronizeQuota(String path, long nsQuota, long ssQuota,
StorageType type) throws IOException {
if (isQuotaSyncRequired(nsQuota, ssQuota)) {
if (iStateStoreCache) {
((StateStoreCache) this.router.getSubclusterResolver()).loadCache(true);
}
Quota routerQuota = this.router.getRpcServer().getQuotaModule();
routerQuota.setQuota(path, nsQuota, ssQuota, null, false);
routerQuota.setQuota(path, nsQuota, ssQuota, type, false);
}
}
@ -380,7 +395,7 @@ public class RouterAdminServer extends AbstractService
// clear sub-cluster's quota definition
try {
synchronizeQuota(request.getSrcPath(), HdfsConstants.QUOTA_RESET,
HdfsConstants.QUOTA_RESET);
HdfsConstants.QUOTA_RESET, null);
} catch (Exception e) {
// Ignore exception, if any while reseting quota. Specifically to handle
// if the actual destination doesn't exist.

View File

@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
/**
@ -217,17 +218,18 @@ public class RouterQuotaManager {
/**
* Check if the quota was set.
* @param quota RouterQuotaUsage set in mount table.
* @param quota the quota usage.
* @return True if the quota is set.
*/
public boolean isQuotaSet(RouterQuotaUsage quota) {
public static boolean isQuotaSet(QuotaUsage quota) {
if (quota != null) {
long nsQuota = quota.getQuota();
long ssQuota = quota.getSpaceQuota();
// once nsQuota or ssQuota was set, this mount table is quota set
if (nsQuota != HdfsConstants.QUOTA_RESET
|| ssQuota != HdfsConstants.QUOTA_RESET) {
|| ssQuota != HdfsConstants.QUOTA_RESET || Quota.orByStorageType(
t -> quota.getTypeQuota(t) != HdfsConstants.QUOTA_RESET)) {
return true;
}
}

View File

@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
@ -86,6 +88,9 @@ public class RouterQuotaUpdateService extends PeriodicService {
RouterQuotaUsage oldQuota = entry.getQuota();
long nsQuota = oldQuota.getQuota();
long ssQuota = oldQuota.getSpaceQuota();
long[] typeQuota = new long[StorageType.values().length];
Quota.eachByStorageType(
t -> typeQuota[t.ordinal()] = oldQuota.getTypeQuota(t));
QuotaUsage currentQuotaUsage = null;
@ -95,11 +100,12 @@ public class RouterQuotaUpdateService extends PeriodicService {
// For other mount entry get current quota usage
HdfsFileStatus ret = this.rpcServer.getFileInfo(src);
if (ret == null || ret.getModificationTime() == 0) {
currentQuotaUsage = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(0)
.quota(nsQuota)
.spaceConsumed(0)
.spaceQuota(ssQuota).build();
long[] zeroConsume = new long[StorageType.values().length];
currentQuotaUsage =
new RouterQuotaUsage.Builder().fileAndDirectoryCount(0)
.quota(nsQuota).spaceConsumed(0).spaceQuota(ssQuota)
.typeConsumed(zeroConsume)
.typeQuota(typeQuota).build();
} else {
// Call RouterRpcServer#getQuotaUsage for getting current quota usage.
// If any exception occurs catch it and proceed with other entries.
@ -147,6 +153,16 @@ public class RouterQuotaUpdateService extends PeriodicService {
remoteQuota.getSpaceQuota(), gQuota.getQuota(),
gQuota.getSpaceQuota());
}
for (StorageType t : StorageType.values()) {
if (remoteQuota.getTypeQuota(t) != gQuota.getTypeQuota(t)) {
this.rpcServer.getQuotaModule()
.setQuotaInternal(location.getSrc(), Arrays.asList(location),
HdfsConstants.QUOTA_DONT_SET, gQuota.getTypeQuota(t), t);
LOG.info("[Fix Quota] src={} dst={} type={} oldQuota={} newQuota={}",
location.getSrc(), location, t, remoteQuota.getTypeQuota(t),
gQuota.getTypeQuota(t));
}
}
}
/**
@ -234,11 +250,15 @@ public class RouterQuotaUpdateService extends PeriodicService {
*/
private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota,
QuotaUsage currentQuotaUsage) {
RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
RouterQuotaUsage.Builder newQuotaBuilder = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount())
.quota(oldQuota.getQuota())
.spaceConsumed(currentQuotaUsage.getSpaceConsumed())
.spaceQuota(oldQuota.getSpaceQuota()).build();
return newQuota;
.spaceQuota(oldQuota.getSpaceQuota());
Quota.eachByStorageType(t -> {
newQuotaBuilder.typeQuota(t, oldQuota.getTypeQuota(t));
newQuotaBuilder.typeConsumed(t, currentQuotaUsage.getTypeConsumed(t));
});
return newQuotaBuilder.build();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@ -67,6 +68,24 @@ public final class RouterQuotaUsage extends QuotaUsage {
super.spaceQuota(spaceQuota);
return this;
}
@Override
public Builder typeConsumed(long[] typeConsumed) {
super.typeConsumed(typeConsumed);
return this;
}
@Override
public Builder typeQuota(long[] typeQuota) {
super.typeQuota(typeQuota);
return this;
}
@Override
public Builder typeQuota(StorageType type, long quota) {
super.typeQuota(type, quota);
return this;
}
}
/**
@ -95,6 +114,24 @@ public final class RouterQuotaUsage extends QuotaUsage {
}
}
/**
* Verify space quota by storage type is violated once quota is set. Relevant
* method {@link DirectoryWithQuotaFeature#verifyQuotaByStorageType}.
* @throws DSQuotaExceededException If the quota is exceeded.
*/
public void verifyQuotaByStorageType() throws DSQuotaExceededException {
for (StorageType t: StorageType.getTypesSupportingQuota()) {
long typeQuota = getTypeQuota(t);
if (typeQuota == HdfsConstants.QUOTA_RESET) {
continue;
}
long typeConsumed = getTypeConsumed(t);
if (Quota.isViolated(typeQuota, typeConsumed)) {
throw new DSQuotaExceededException(typeQuota, typeConsumed);
}
}
}
@Override
public String toString() {
String nsQuota = "-";

View File

@ -1519,6 +1519,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
if (quotaUsage != null) {
quotaUsage.verifyNamespaceQuota();
quotaUsage.verifyStoragespaceQuota();
quotaUsage.verifyQuotaByStorageType();
}
}
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Time;
import static org.apache.hadoop.hdfs.server.federation.router.Quota.eachByStorageType;
/**
* Implementation of the {@link MountTableStore} state store API.
@ -160,12 +161,16 @@ public class MountTableStoreImpl extends MountTableStore {
this.getQuotaManager().getQuotaUsage(record.getSourcePath());
if (quota != null) {
RouterQuotaUsage oldquota = record.getQuota();
RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(quota.getFileAndDirectoryCount())
.quota(oldquota.getQuota())
.spaceConsumed(quota.getSpaceConsumed())
.spaceQuota(oldquota.getSpaceQuota()).build();
record.setQuota(newQuota);
.spaceQuota(oldquota.getSpaceQuota());
eachByStorageType(t -> {
builder.typeQuota(t, oldquota.getTypeQuota(t));
builder.typeConsumed(t, quota.getTypeConsumed(t));
});
record.setQuota(builder.build());
}
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.Builder;
@ -28,9 +29,13 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoteLocationProto;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.QuotaUsageProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeQuotaInfosProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeQuotaInfoProto;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import static org.apache.hadoop.hdfs.server.federation.router.Quota.eachByStorageType;
import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
@ -275,17 +280,31 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
long nsCount = RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT;
long ssQuota = HdfsConstants.QUOTA_RESET;
long ssCount = RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT;
long[] typeQuota = new long[StorageType.values().length];
long[] typeConsume = new long[StorageType.values().length];
eachByStorageType(t -> typeQuota[t.ordinal()] = HdfsConstants.QUOTA_RESET);
eachByStorageType(t -> typeConsume[t.ordinal()] =
RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT);
if (proto.hasQuota()) {
QuotaUsageProto quotaProto = proto.getQuota();
nsQuota = quotaProto.getQuota();
nsCount = quotaProto.getFileAndDirectoryCount();
ssQuota = quotaProto.getSpaceQuota();
ssCount = quotaProto.getSpaceConsumed();
if (quotaProto.hasTypeQuotaInfos()) {
StorageTypeQuotaInfosProto typeInfo = quotaProto.getTypeQuotaInfos();
for (StorageTypeQuotaInfoProto tp : typeInfo.getTypeQuotaInfoList()) {
typeQuota[StorageType.parseStorageType(tp.getType().name())
.ordinal()] = tp.getQuota();
typeConsume[StorageType.parseStorageType(tp.getType().name())
.ordinal()] = tp.getConsumed();
}
}
}
RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
.quota(nsQuota).fileAndDirectoryCount(nsCount).spaceQuota(ssQuota)
.spaceConsumed(ssCount);
.spaceConsumed(ssCount).typeQuota(typeQuota).typeConsumed(typeConsume);
return builder.build();
}
@ -295,10 +314,21 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
if (quota == null) {
builder.clearQuota();
} else {
QuotaUsageProto quotaUsage = QuotaUsageProto.newBuilder()
.setFileAndDirectoryCount(quota.getFileAndDirectoryCount())
QuotaUsageProto.Builder quotaBuilder = QuotaUsageProto.newBuilder();
quotaBuilder.setFileAndDirectoryCount(quota.getFileAndDirectoryCount())
.setQuota(quota.getQuota()).setSpaceConsumed(quota.getSpaceConsumed())
.setSpaceQuota(quota.getSpaceQuota()).build();
.setSpaceQuota(quota.getSpaceQuota());
if (quota.isTypeQuotaSet()) {
StorageTypeQuotaInfosProto.Builder infoBuilder =
StorageTypeQuotaInfosProto.newBuilder();
eachByStorageType(t -> infoBuilder.addTypeQuotaInfo(
StorageTypeQuotaInfoProto.newBuilder()
.setType(HdfsProtos.StorageTypeProto.valueOf(t.name()))
.setQuota(quota.getTypeQuota(t))
.setConsumed(quota.getTypeConsumed(t)).build()));
quotaBuilder.setTypeQuotaInfos(infoBuilder.build());
}
QuotaUsageProto quotaUsage = quotaBuilder.build();
builder.setQuota(quotaUsage);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -83,6 +84,9 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.server.federation.router.Quota.eachByStorageType;
import static org.apache.hadoop.hdfs.server.federation.router.Quota.orByStorageType;
import static org.apache.hadoop.hdfs.server.federation.router.Quota.andByStorageType;
/**
* This class provides some Federation administrative access shell commands.
@ -125,8 +129,8 @@ public class RouterAdmin extends Configured implements Tool {
private String getUsage(String cmd) {
if (cmd == null) {
String[] commands =
{"-add", "-update", "-rm", "-ls", "-getDestination",
"-setQuota", "-clrQuota",
{"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota",
"-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota",
"-safemode", "-nameservice", "-getDisabledNameservices",
"-refresh", "-refreshRouterArgs",
"-refreshSuperUserGroupsConfiguration"};
@ -160,8 +164,13 @@ public class RouterAdmin extends Configured implements Tool {
} else if (cmd.equals("-setQuota")) {
return "\t[-setQuota <path> -nsQuota <nsQuota> -ssQuota "
+ "<quota in bytes or quota size string>]";
} else if (cmd.equals("-setStorageTypeQuota")) {
return "\t[-setStorageTypeQuota <path> -storageType <storage type> "
+ "<quota in bytes or quota size string>]";
} else if (cmd.equals("-clrQuota")) {
return "\t[-clrQuota <path>]";
} else if (cmd.equals("-clrStorageTypeQuota")) {
return "\t[-clrStorageTypeQuota <path>]";
} else if (cmd.equals("-safemode")) {
return "\t[-safemode enter | leave | get]";
} else if (cmd.equals("-nameservice")) {
@ -241,10 +250,18 @@ public class RouterAdmin extends Configured implements Tool {
if (argv.length < 4) {
return false;
}
} else if ("-setStorageTypeQuota".equals(cmd)) {
if (argv.length < 5) {
return false;
}
} else if ("-clrQuota".equals(cmd)) {
if (argv.length < 2) {
return false;
}
} else if ("-clrStorageTypeQuota".equals(cmd)) {
if (argv.length < 2) {
return false;
}
} else if ("-safemode".equals(cmd)) {
if (argv.length < 2) {
return false;
@ -336,6 +353,11 @@ public class RouterAdmin extends Configured implements Tool {
System.out.println(
"Successfully set quota for mount point " + argv[i]);
}
} else if ("-setStorageTypeQuota".equals(cmd)) {
if (setStorageTypeQuota(argv, i)) {
System.out.println(
"Successfully set storage type quota for mount point " + argv[i]);
}
} else if ("-clrQuota".equals(cmd)) {
while (i < argv.length) {
if (clrQuota(argv[i])) {
@ -344,6 +366,14 @@ public class RouterAdmin extends Configured implements Tool {
i++;
}
}
} else if ("-clrStorageTypeQuota".equals(cmd)) {
while (i < argv.length) {
if (clrStorageTypeQuota(argv[i])) {
System.out.println("Successfully clear storage type quota for mount"
+ " point " + argv[i]);
i++;
}
}
} else if ("-safemode".equals(cmd)) {
manageSafeMode(argv[i]);
} else if ("-nameservice".equals(cmd)) {
@ -887,6 +917,41 @@ public class RouterAdmin extends Configured implements Tool {
return updateQuota(mount, nsQuota, ssQuota);
}
/**
* Set storage type quota for a mount table entry.
*
* @param parameters Parameters of the quota.
* @param i Index in the parameters.
*/
private boolean setStorageTypeQuota(String[] parameters, int i)
throws IOException {
long[] typeQuota = new long[StorageType.values().length];
eachByStorageType(
t -> typeQuota[t.ordinal()] = HdfsConstants.QUOTA_DONT_SET);
String mount = parameters[i++];
if (parameters[i].equals("-storageType")) {
i++;
StorageType type = StorageType.parseStorageType(parameters[i++]);
typeQuota[type.ordinal()] = Long.parseLong(parameters[i]);
} else {
throw new IllegalArgumentException("Invalid argument : " + parameters[i]);
}
if (orByStorageType(t -> typeQuota[t.ordinal()] <= 0)) {
throw new IllegalArgumentException(
"Input quota value should be a positive number.");
}
if (andByStorageType(
t -> typeQuota[t.ordinal()] == HdfsConstants.QUOTA_DONT_SET)) {
throw new IllegalArgumentException(
"Must specify at least one of -nsQuota and -ssQuota.");
}
return updateStorageTypeQuota(mount, typeQuota);
}
/**
* Clear quota of the mount point.
*
@ -899,6 +964,19 @@ public class RouterAdmin extends Configured implements Tool {
HdfsConstants.QUOTA_RESET);
}
/**
* Clear storage type quota of the mount point.
*
* @param mount Mount table to clear
* @return If the quota was cleared.
* @throws IOException Error clearing the mount point.
*/
private boolean clrStorageTypeQuota(String mount) throws IOException {
long[] typeQuota = new long[StorageType.values().length];
eachByStorageType(t -> typeQuota[t.ordinal()] = HdfsConstants.QUOTA_RESET);
return updateStorageTypeQuota(mount, typeQuota);
}
/**
* Update quota of specified mount table.
*
@ -958,6 +1036,64 @@ public class RouterAdmin extends Configured implements Tool {
return updateResponse.getStatus();
}
/**
* Update storage type quota of specified mount table.
*
* @param mount Specified mount table to update.
* @param typeQuota Storage type quota.
* @return If the quota was updated.
* @throws IOException Error updating quota.
*/
private boolean updateStorageTypeQuota(String mount, long[] typeQuota)
throws IOException {
// Get existing entry
MountTableManager mountTable = client.getMountTableManager();
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
.newInstance(mount);
GetMountTableEntriesResponse getResponse = mountTable
.getMountTableEntries(getRequest);
List<MountTable> results = getResponse.getEntries();
MountTable existingEntry = null;
for (MountTable result : results) {
if (mount.equals(result.getSourcePath())) {
existingEntry = result;
break;
}
}
if (existingEntry == null) {
throw new IOException(mount + " doesn't exist in mount table.");
} else {
final RouterQuotaUsage quotaUsage = existingEntry.getQuota();
long[] typeCount = new long[StorageType.values().length];
eachByStorageType(
t -> typeCount[t.ordinal()] = quotaUsage.getTypeQuota(t));
// If all storage type quota were reset, clear the storage type quota.
if (andByStorageType(
t -> typeQuota[t.ordinal()] == HdfsConstants.QUOTA_RESET)) {
eachByStorageType(t -> typeCount[t.ordinal()] =
RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT);
} else {
// If nsQuota or ssQuota was unset, use the value in mount table.
eachByStorageType(t -> {
if (typeQuota[t.ordinal()] == HdfsConstants.QUOTA_DONT_SET) {
typeQuota[t.ordinal()] = quotaUsage.getTypeQuota(t);
}
});
}
RouterQuotaUsage updatedQuota = new RouterQuotaUsage.Builder()
.typeQuota(typeQuota).typeConsumed(typeCount).build();
existingEntry.setQuota(updatedQuota);
}
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(existingEntry);
UpdateMountTableEntryResponse updateResponse = mountTable
.updateMountTableEntry(updateRequest);
return updateResponse.getStatus();
}
/**
* Manager the safe mode state.
* @param cmd Input command, enter or leave safe mode.

View File

@ -239,6 +239,18 @@ The federation admin tool supports setting quotas for specified mount table entr
The above command means that we allow the path to have a maximum of 100 file/directories and use at most 1024 bytes storage space. The parameter for *ssQuota*
supports multiple size-unit suffix (e.g. 1k is 1KB, 5m is 5MB). If no suffix is specified then bytes is assumed.
Set storage type quota for specified mount table entry:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -setStorageTypeQuota <path> -storageType <storage type>
Remove quota for specified mount table entry:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -clrQuota <path>
Remove storage type quota for specified mount table entry:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -clrStorageTypeQuota <path>
Ls command will show below information for each mount table entry:
Source Destinations Owner Group Mode Quota/Usage

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
@ -33,6 +34,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
@ -663,8 +665,12 @@ public class TestRouterAdminCLI {
+ "-owner <owner> -group <group> -mode <mode>]\n" + "\t[-rm <source>]\n"
+ "\t[-ls [-d] <path>]\n"
+ "\t[-getDestination <path>]\n"
+ "\t[-setQuota <path> -nsQuota <nsQuota> -ssQuota "
+ "<quota in bytes or quota size string>]\n" + "\t[-clrQuota <path>]\n"
+ "\t[-setQuota <path> -nsQuota <nsQuota> -ssQuota"
+ " <quota in bytes or quota size string>]\n"
+ "\t[-setStorageTypeQuota <path> -storageType <storage type>"
+ " <quota in bytes or quota size string>]\n"
+ "\t[-clrQuota <path>]\n"
+ "\t[-clrStorageTypeQuota <path>]\n"
+ "\t[-safemode enter | leave | get]\n"
+ "\t[-nameservice enable | disable <nameservice>]\n"
+ "\t[-getDisabledNameservices]\n"
@ -674,6 +680,99 @@ public class TestRouterAdminCLI {
out.reset();
}
/**
* Test command -setStorageTypeQuota with wrong arguments.
*/
@Test
public void testWrongArgumentsWhenSetStorageTypeQuota() throws Exception {
String src = "/type-QuotaMounttable";
// verify wrong arguments.
System.setErr(new PrintStream(err));
String[] argv =
new String[] {"-setStorageTypeQuota", src, "check", "c2", "c3"};
ToolRunner.run(admin, argv);
assertTrue(err.toString().contains("Invalid argument : check"));
}
/**
* Test command -setStorageTypeQuota.
*/
@Test
public void testSetStorageTypeQuota() throws Exception {
String nsId = "ns0";
String src = "/type-QuotaMounttable";
String dest = "/type-QuotaMounttable";
try {
addMountTable(src, nsId, dest);
// verify the default quota.
MountTable mountTable = getMountTable(src).get(0);
RouterQuotaUsage quotaUsage = mountTable.getQuota();
for (StorageType t : StorageType.values()) {
assertEquals(RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT,
quotaUsage.getTypeConsumed(t));
assertEquals(HdfsConstants.QUOTA_RESET, quotaUsage.getTypeQuota(t));
}
// set storage type quota.
long ssQuota = 100;
setStorageTypeQuota(src, ssQuota, StorageType.DISK);
// verify if the quota is set
mountTable = getMountTable(src).get(0);
quotaUsage = mountTable.getQuota();
assertEquals(ssQuota, quotaUsage.getTypeQuota(StorageType.DISK));
} finally {
rmMountTable(src);
}
}
/**
* Test command -clrStorageTypeQuota.
*/
@Test
public void testClearStorageTypeQuota() throws Exception {
String nsId = "ns0";
String src = "/type-QuotaMounttable";
String src1 = "/type-QuotaMounttable1";
String dest = "/type-QuotaMounttable";
String dest1 = "/type-QuotaMounttable1";
long ssQuota = 100;
try {
// add mount points.
addMountTable(src, nsId, dest);
addMountTable(src1, nsId, dest1);
// set storage type quota to src and src1.
setStorageTypeQuota(src, ssQuota, StorageType.DISK);
assertEquals(ssQuota,
getMountTable(src).get(0).getQuota().getTypeQuota(StorageType.DISK));
setStorageTypeQuota(src1, ssQuota, StorageType.DISK);
assertEquals(ssQuota,
getMountTable(src1).get(0).getQuota().getTypeQuota(StorageType.DISK));
// clrQuota of src and src1.
assertEquals(0, ToolRunner
.run(admin, new String[] {"-clrStorageTypeQuota", src, src1}));
stateStore.loadCache(MountTableStoreImpl.class, true);
// Verify whether the storage type quotas are cleared.
List<MountTable> mountTables = getMountTable("/");
for (int i = 0; i < 2; i++) {
MountTable mountTable = mountTables.get(i);
RouterQuotaUsage quotaUsage = mountTable.getQuota();
for (StorageType t : StorageType.values()) {
assertEquals(RouterQuotaUsage.QUOTA_USAGE_COUNT_DEFAULT,
quotaUsage.getTypeConsumed(t));
assertEquals(HdfsConstants.QUOTA_RESET, quotaUsage.getTypeQuota(t));
}
}
} finally {
rmMountTable(src);
rmMountTable(src1);
}
}
@Test
public void testSetAndClearQuota() throws Exception {
String nsId = "ns0";
@ -1453,4 +1552,33 @@ public class TestRouterAdminCLI {
"-order", "HASH_ALL", "-faulttolerant"};
assertEquals(0, ToolRunner.run(admin, argv));
}
private void addMountTable(String src, String nsId, String dst)
throws Exception {
String[] argv = new String[] {"-add", src, nsId, dst};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
}
private List<MountTable> getMountTable(String src) throws IOException {
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(src);
GetMountTableEntriesResponse getResponse =
client.getMountTableManager().getMountTableEntries(getRequest);
return getResponse.getEntries();
}
private void setStorageTypeQuota(String src, long ssQuota, StorageType type)
throws Exception {
assertEquals(0, ToolRunner.run(admin,
new String[] {"-setStorageTypeQuota", src, "-storageType", type.name(),
String.valueOf(ssQuota)}));
stateStore.loadCache(MountTableStoreImpl.class, true);
}
private void rmMountTable(String src) throws Exception {
String[] argv = new String[] {"-rm", src};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@ -231,6 +232,30 @@ public class TestRouterQuota {
appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE);
}
@Test
public void testStorageTypeQuotaExceed() throws Exception {
long ssQuota = BLOCK_SIZE * 3;
DFSClient routerClient = routerContext.getClient();
prepareStorageTypeQuotaTestMountTable(StorageType.DISK, BLOCK_SIZE,
ssQuota * 2, ssQuota, BLOCK_SIZE + 1, BLOCK_SIZE + 1);
// Verify quota exceed on Router.
LambdaTestUtils.intercept(DSQuotaExceededException.class,
"The DiskSpace quota is exceeded", "Expect quota exceed exception.",
() -> appendData("/type0/file", routerClient, BLOCK_SIZE));
LambdaTestUtils.intercept(DSQuotaExceededException.class,
"The DiskSpace quota is exceeded", "Expect quota exceed exception.",
() -> appendData("/type0/type1/file", routerClient, BLOCK_SIZE));
// Verify quota exceed on NN1.
LambdaTestUtils.intercept(QuotaByStorageTypeExceededException.class,
"Quota by storage type", "Expect quota exceed exception.",
() -> appendData("/type0/file", nnContext1.getClient(), BLOCK_SIZE));
LambdaTestUtils.intercept(QuotaByStorageTypeExceededException.class,
"Quota by storage type", "Expect quota exceed exception.",
() -> appendData("/type1/file", nnContext1.getClient(), BLOCK_SIZE));
}
/**
* Add a mount table entry to the mount table through the admin API.
* @param entry Mount table entry to add.
@ -377,6 +402,46 @@ public class TestRouterQuota {
assertEquals(ssQuota, quota2.getSpaceQuota());
}
@Test
public void testStorageTypeQuota() throws Exception {
long ssQuota = BLOCK_SIZE * 3;
int fileSize = BLOCK_SIZE;
prepareStorageTypeQuotaTestMountTable(StorageType.DISK, BLOCK_SIZE,
ssQuota * 2, ssQuota, fileSize, fileSize);
// Verify /type0 quota on NN1.
ClientProtocol client = nnContext1.getClient().getNamenode();
QuotaUsage usage = client.getQuotaUsage("/type0");
assertEquals(HdfsConstants.QUOTA_RESET, usage.getQuota());
assertEquals(HdfsConstants.QUOTA_RESET, usage.getSpaceQuota());
verifyTypeQuotaAndConsume(new long[] {-1, -1, ssQuota * 2, -1, -1}, null,
usage);
// Verify /type1 quota on NN1.
usage = client.getQuotaUsage("/type1");
assertEquals(HdfsConstants.QUOTA_RESET, usage.getQuota());
assertEquals(HdfsConstants.QUOTA_RESET, usage.getSpaceQuota());
verifyTypeQuotaAndConsume(new long[] {-1, -1, ssQuota, -1, -1}, null,
usage);
FileSystem routerFs = routerContext.getFileSystem();
QuotaUsage u0 = routerFs.getQuotaUsage(new Path("/type0"));
QuotaUsage u1 = routerFs.getQuotaUsage(new Path("/type0/type1"));
// Verify /type0/type1 storage type quota usage on Router.
assertEquals(HdfsConstants.QUOTA_RESET, u1.getQuota());
assertEquals(2, u1.getFileAndDirectoryCount());
assertEquals(HdfsConstants.QUOTA_RESET, u1.getSpaceQuota());
assertEquals(fileSize * 3, u1.getSpaceConsumed());
verifyTypeQuotaAndConsume(new long[] {-1, -1, ssQuota, -1, -1},
new long[] {0, 0, fileSize * 3, 0, 0}, u1);
// Verify /type0 storage type quota usage on Router.
assertEquals(HdfsConstants.QUOTA_RESET, u0.getQuota());
assertEquals(4, u0.getFileAndDirectoryCount());
assertEquals(HdfsConstants.QUOTA_RESET, u0.getSpaceQuota());
assertEquals(fileSize * 3 * 2, u0.getSpaceConsumed());
verifyTypeQuotaAndConsume(new long[] {-1, -1, ssQuota * 2, -1, -1},
new long[] {0, 0, fileSize * 3 * 2, 0, 0}, u0);
}
@Test
public void testGetQuota() throws Exception {
long nsQuota = 10;
@ -1154,4 +1219,59 @@ public class TestRouterQuota {
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
}
/**
* Add two mount tables.
* /type0 --> ns0---/type0 [-1, -1, {STORAGE_TYPE:quota}]
* /type0/type1 --> ns0---/type1 [-1, -1, {STORAGE_TYPE:quota}]
*
* Add two files with storage policy HOT.
* /type0/file --> ns0---/type0/file
* /type0/type1/file --> ns0---/type1/file
*/
private void prepareStorageTypeQuotaTestMountTable(StorageType type,
long blkSize, long quota0, long quota1, int len0, int len1)
throws Exception {
final FileSystem nnFs1 = nnContext1.getFileSystem();
nnFs1.mkdirs(new Path("/type0"));
nnFs1.mkdirs(new Path("/type1"));
((DistributedFileSystem) nnContext1.getFileSystem())
.createFile(new Path("/type0/file")).storagePolicyName("HOT")
.blockSize(blkSize).build().close();
((DistributedFileSystem) nnContext1.getFileSystem())
.createFile(new Path("/type1/file")).storagePolicyName("HOT")
.blockSize(blkSize).build().close();
DFSClient client = nnContext1.getClient();
appendData("/type0/file", client, len0);
appendData("/type1/file", client, len1);
MountTable mountTable = MountTable
.newInstance("/type0", Collections.singletonMap("ns0", "/type0"));
mountTable.setQuota(
new RouterQuotaUsage.Builder().typeQuota(type, quota0).build());
addMountTable(mountTable);
mountTable = MountTable
.newInstance("/type0/type1", Collections.singletonMap("ns0", "/type1"));
mountTable.setQuota(
new RouterQuotaUsage.Builder().typeQuota(type, quota1).build());
addMountTable(mountTable);
// ensure mount table is updated to Router.
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
}
private void verifyTypeQuotaAndConsume(long[] quota, long[] consume,
QuotaUsage usage) {
for (StorageType t : StorageType.values()) {
if (quota != null) {
assertEquals(quota[t.ordinal()], usage.getTypeQuota(t));
}
if (consume != null) {
assertEquals(consume[t.ordinal()], usage.getTypeConsumed(t));
}
}
}
}

View File

@ -438,7 +438,9 @@ Usage:
[-ls [-d] <path>]
[-getDestination <path>]
[-setQuota <path> -nsQuota <nsQuota> -ssQuota <quota in bytes or quota size string>]
[-setStorageTypeQuota <path> -storageType <storage type> <quota in bytes or quota size string>]
[-clrQuota <path>]
[-clrStorageTypeQuota <path>]
[-safemode enter | leave | get]
[-nameservice disable | enable <nameservice>]
[-getDisabledNameservices]
@ -454,7 +456,9 @@ Usage:
| `-ls` `[-d]` *path* | List mount points under specified path. Specify -d parameter to get detailed listing.|
| `-getDestination` *path* | Get the subcluster where a file is or should be created. |
| `-setQuota` *path* `-nsQuota` *nsQuota* `-ssQuota` *ssQuota* | Set quota for specified path. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
| `-setStorageTypeQuota` *path* `-storageType` *storageType* *stQuota* | Set storage type quota for specified path. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
| `-clrQuota` *path* | Clear quota of given mount point. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
| `-clrStorageTypeQuota` *path* | Clear storage type quota of given mount point. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
| `-safemode` `enter` `leave` `get` | Manually set the Router entering or leaving safe mode. The option *get* will be used for verifying if the Router is in safe mode state. |
| `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. |
| `-getDisabledNameservices` | Get the name services that are disabled in the federation. |