HDFS-13811. RBF: Race condition between router admin quota update and periodic quota update service. Contributed by Jinglun.
This commit is contained in:
parent
7c869b4df2
commit
47fdae7904
@ -43,6 +43,7 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
@ -292,6 +293,13 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
||||
}
|
||||
|
||||
super.serviceInit(conf);
|
||||
|
||||
// Set quota manager in mount store to update quota usage in mount table.
|
||||
if (stateStore != null) {
|
||||
MountTableStore mountstore =
|
||||
this.stateStore.getRegisteredRecordStore(MountTableStore.class);
|
||||
mountstore.setQuotaManager(this.quotaManager);
|
||||
}
|
||||
}
|
||||
|
||||
private String getDisabledDependentServices() {
|
||||
|
@ -169,6 +169,27 @@ public void put(String path, RouterQuotaUsage quotaUsage) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update quota in cache. The usage will be preserved.
|
||||
* @param path Mount table path.
|
||||
* @param quota Corresponding quota value.
|
||||
*/
|
||||
public void updateQuota(String path, RouterQuotaUsage quota) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
|
||||
.quota(quota.getQuota()).spaceQuota(quota.getSpaceQuota());
|
||||
RouterQuotaUsage current = this.cache.get(path);
|
||||
if (current != null) {
|
||||
builder.fileAndDirectoryCount(current.getFileAndDirectoryCount())
|
||||
.spaceConsumed(current.getSpaceConsumed());
|
||||
}
|
||||
this.cache.put(path, builder.build());
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the entity from cache.
|
||||
* @param path Mount table path.
|
||||
|
@ -35,15 +35,13 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Service to periodically update the {@link RouterQuotaUsage}
|
||||
* cached information in the {@link Router} and update corresponding
|
||||
* mount table in State Store.
|
||||
* cached information in the {@link Router}.
|
||||
*/
|
||||
public class RouterQuotaUpdateService extends PeriodicService {
|
||||
private static final Logger LOG =
|
||||
@ -81,7 +79,6 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
protected void periodicInvoke() {
|
||||
LOG.debug("Start to update quota cache.");
|
||||
try {
|
||||
List<MountTable> updateMountTables = new LinkedList<>();
|
||||
List<MountTable> mountTables = getQuotaSetMountTables();
|
||||
Map<RemoteLocation, QuotaUsage> remoteQuotaUsage = new HashMap<>();
|
||||
for (MountTable entry : mountTables) {
|
||||
@ -122,18 +119,6 @@ protected void periodicInvoke() {
|
||||
currentQuotaUsage);
|
||||
this.quotaManager.put(src, newQuota);
|
||||
entry.setQuota(newQuota);
|
||||
|
||||
// only update mount tables which quota was changed
|
||||
if (!oldQuota.equals(newQuota)) {
|
||||
updateMountTables.add(entry);
|
||||
|
||||
LOG.debug(
|
||||
"Update quota usage entity of path: {}, nsCount: {},"
|
||||
+ " nsQuota: {}, ssCount: {}, ssQuota: {}.",
|
||||
src, newQuota.getFileAndDirectoryCount(),
|
||||
newQuota.getQuota(), newQuota.getSpaceConsumed(),
|
||||
newQuota.getSpaceQuota());
|
||||
}
|
||||
}
|
||||
|
||||
// Fix inconsistent quota.
|
||||
@ -143,8 +128,6 @@ protected void periodicInvoke() {
|
||||
QuotaUsage currentQuota = en.getValue();
|
||||
fixGlobalQuota(remoteLocation, currentQuota);
|
||||
}
|
||||
|
||||
updateMountTableEntries(updateMountTables);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Quota cache updated error.", e);
|
||||
}
|
||||
@ -219,7 +202,7 @@ private List<MountTable> getQuotaSetMountTables() throws IOException {
|
||||
|
||||
// update mount table entries info in quota cache
|
||||
String src = entry.getSourcePath();
|
||||
this.quotaManager.put(src, entry.getQuota());
|
||||
this.quotaManager.updateQuota(src, entry.getQuota());
|
||||
stalePaths.remove(src);
|
||||
}
|
||||
|
||||
@ -258,23 +241,4 @@ private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota,
|
||||
.spaceQuota(oldQuota.getSpaceQuota()).build();
|
||||
return newQuota;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write out updated mount table entries into State Store.
|
||||
* @param updateMountTables Mount tables to be updated.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void updateMountTableEntries(List<MountTable> updateMountTables)
|
||||
throws IOException {
|
||||
for (MountTable entry : updateMountTables) {
|
||||
UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
|
||||
.newInstance(entry);
|
||||
try {
|
||||
getMountTableStore().updateMountTableEntry(updateRequest);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Quota update error for mount entry "
|
||||
+ entry.getSourcePath(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.slf4j.Logger;
|
||||
@ -48,7 +49,8 @@ public abstract class MountTableStore extends CachedRecordStore<MountTable>
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(MountTableStore.class);
|
||||
private MountTableRefresherService refreshService;
|
||||
|
||||
/** Router quota manager to update quota usage in mount table. */
|
||||
private RouterQuotaManager quotaManager;
|
||||
public MountTableStore(StateStoreDriver driver) {
|
||||
super(MountTable.class, driver);
|
||||
}
|
||||
@ -57,6 +59,14 @@ public void setRefreshService(MountTableRefresherService refreshService) {
|
||||
this.refreshService = refreshService;
|
||||
}
|
||||
|
||||
public void setQuotaManager(RouterQuotaManager quotaManager) {
|
||||
this.quotaManager = quotaManager;
|
||||
}
|
||||
|
||||
public RouterQuotaManager getQuotaManager() {
|
||||
return quotaManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update mount table cache of this router as well as all other routers.
|
||||
*/
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
@ -153,6 +154,20 @@ public GetMountTableEntriesResponse getMountTableEntries(
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
// If quota manager is not null, update quota usage from quota cache.
|
||||
if (this.getQuotaManager() != null) {
|
||||
RouterQuotaUsage quota =
|
||||
this.getQuotaManager().getQuotaUsage(record.getSourcePath());
|
||||
if (quota != null) {
|
||||
RouterQuotaUsage oldquota = record.getQuota();
|
||||
RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
|
||||
.fileAndDirectoryCount(quota.getFileAndDirectoryCount())
|
||||
.quota(oldquota.getQuota())
|
||||
.spaceConsumed(quota.getSpaceConsumed())
|
||||
.spaceQuota(oldquota.getSpaceQuota()).build();
|
||||
record.setQuota(newQuota);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
@ -64,6 +65,7 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
@ -482,6 +484,10 @@ private boolean removeMountTable(String path) throws IOException {
|
||||
return removeResponse.getStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link RouterQuotaUpdateService#periodicInvoke()} updates quota usage
|
||||
* in RouterQuotaManager.
|
||||
*/
|
||||
@Test
|
||||
public void testQuotaUpdating() throws Exception {
|
||||
long nsQuota = 30;
|
||||
@ -498,8 +504,7 @@ public void testQuotaUpdating() throws Exception {
|
||||
.spaceQuota(ssQuota).build());
|
||||
addMountTable(mountTable);
|
||||
|
||||
// Call periodicInvoke to ensure quota updated in quota manager
|
||||
// and state store.
|
||||
// Call periodicInvoke to ensure quota updated in quota manager.
|
||||
RouterQuotaUpdateService updateService = routerContext.getRouter()
|
||||
.getQuotaCacheUpdateService();
|
||||
updateService.periodicInvoke();
|
||||
@ -523,7 +528,7 @@ public void testQuotaUpdating() throws Exception {
|
||||
updatedMountTable = getMountTable(path);
|
||||
quota = updatedMountTable.getQuota();
|
||||
|
||||
// verify if quota has been updated in state store
|
||||
// verify if quota usage has been updated in quota manager.
|
||||
assertEquals(nsQuota, quota.getQuota());
|
||||
assertEquals(ssQuota, quota.getSpaceQuota());
|
||||
assertEquals(3, quota.getFileAndDirectoryCount());
|
||||
@ -697,10 +702,10 @@ public void testQuotaRefreshAfterQuotaExceed() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify whether mount table and quota usage cache is updated properly.
|
||||
* Verify whether quota usage cache in RouterQuotaManager is updated properly.
|
||||
* {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update
|
||||
* the cache and the mount table even if the destination directory for some
|
||||
* mount entry is not present in the filesystem.
|
||||
* the cache even if the destination directory for some mount entry is not
|
||||
* present in the filesystem.
|
||||
*/
|
||||
@Test
|
||||
public void testQuotaRefreshWhenDestinationNotPresent() throws Exception {
|
||||
@ -1006,6 +1011,104 @@ public void testGetQuotaUsageOnMountPoint() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* RouterQuotaUpdateService.periodicInvoke() should only update usage in
|
||||
* cache. The mount table in state store shouldn't be updated.
|
||||
*/
|
||||
@Test
|
||||
public void testRouterQuotaUpdateService() throws Exception {
|
||||
Router router = routerContext.getRouter();
|
||||
StateStoreDriver driver = router.getStateStore().getDriver();
|
||||
RouterQuotaUpdateService updateService =
|
||||
router.getQuotaCacheUpdateService();
|
||||
RouterQuotaManager quotaManager = router.getQuotaManager();
|
||||
long nsQuota = 5;
|
||||
long ssQuota = 3 * BLOCK_SIZE;
|
||||
final FileSystem nnFs = nnContext1.getFileSystem();
|
||||
nnFs.mkdirs(new Path("/dir-1"));
|
||||
|
||||
// init mount table.
|
||||
MountTable mountTable = MountTable.newInstance("/dir-1",
|
||||
Collections.singletonMap("ns0", "/dir-1"));
|
||||
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
|
||||
.spaceQuota(ssQuota).build());
|
||||
addMountTable(mountTable);
|
||||
// verify the mount table in state store is updated.
|
||||
QueryResult<MountTable> result = driver.get(MountTable.class);
|
||||
RouterQuotaUsage quotaOnStorage = result.getRecords().get(0).getQuota();
|
||||
assertEquals(nsQuota, quotaOnStorage.getQuota());
|
||||
assertEquals(ssQuota, quotaOnStorage.getSpaceQuota());
|
||||
assertEquals(0, quotaOnStorage.getFileAndDirectoryCount());
|
||||
assertEquals(0, quotaOnStorage.getSpaceConsumed());
|
||||
|
||||
// test RouterQuotaUpdateService.periodicInvoke().
|
||||
updateService.periodicInvoke();
|
||||
|
||||
// RouterQuotaUpdateService should update usage in local cache.
|
||||
RouterQuotaUsage quotaUsage = quotaManager.getQuotaUsage("/dir-1");
|
||||
assertEquals(nsQuota, quotaUsage.getQuota());
|
||||
assertEquals(ssQuota, quotaUsage.getSpaceQuota());
|
||||
assertEquals(1, quotaUsage.getFileAndDirectoryCount());
|
||||
assertEquals(0, quotaUsage.getSpaceConsumed());
|
||||
// RouterQuotaUpdateService shouldn't update mount entry in state store.
|
||||
result = driver.get(MountTable.class);
|
||||
quotaOnStorage = result.getRecords().get(0).getQuota();
|
||||
assertEquals(nsQuota, quotaOnStorage.getQuota());
|
||||
assertEquals(ssQuota, quotaOnStorage.getSpaceQuota());
|
||||
assertEquals(0, quotaOnStorage.getFileAndDirectoryCount());
|
||||
assertEquals(0, quotaOnStorage.getSpaceConsumed());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify whether quota is updated properly.
|
||||
* {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update
|
||||
* the quota even if the destination directory for some mount entry is not
|
||||
* present in the filesystem.
|
||||
*/
|
||||
@Test
|
||||
public void testQuotaUpdateWhenDestinationNotPresent() throws Exception {
|
||||
long nsQuota = 5;
|
||||
long ssQuota = 3 * BLOCK_SIZE;
|
||||
String path = "/dst-not-present";
|
||||
final FileSystem nnFs = nnContext1.getFileSystem();
|
||||
|
||||
// Add one mount table:
|
||||
// /dst-not-present --> ns0---/dst-not-present.
|
||||
nnFs.mkdirs(new Path(path));
|
||||
MountTable mountTable =
|
||||
MountTable.newInstance(path, Collections.singletonMap("ns0", path));
|
||||
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
|
||||
.spaceQuota(ssQuota).build());
|
||||
addMountTable(mountTable);
|
||||
|
||||
Router router = routerContext.getRouter();
|
||||
RouterQuotaManager quotaManager = router.getQuotaManager();
|
||||
RouterQuotaUpdateService updateService =
|
||||
router.getQuotaCacheUpdateService();
|
||||
// verify quota usage is updated in RouterQuotaManager.
|
||||
updateService.periodicInvoke();
|
||||
RouterQuotaUsage quotaUsage = quotaManager.getQuotaUsage(path);
|
||||
assertEquals(nsQuota, quotaUsage.getQuota());
|
||||
assertEquals(ssQuota, quotaUsage.getSpaceQuota());
|
||||
assertEquals(1, quotaUsage.getFileAndDirectoryCount());
|
||||
assertEquals(0, quotaUsage.getSpaceConsumed());
|
||||
|
||||
// Update quota to [nsQuota*2, ssQuota*2].
|
||||
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota * 2)
|
||||
.spaceQuota(ssQuota * 2).build());
|
||||
updateMountTable(mountTable);
|
||||
// Remove /dst-not-present.
|
||||
nnFs.delete(new Path(path), true);
|
||||
|
||||
// verify quota is updated in RouterQuotaManager.
|
||||
updateService.periodicInvoke();
|
||||
quotaUsage = quotaManager.getQuotaUsage(path);
|
||||
assertEquals(nsQuota * 2, quotaUsage.getQuota());
|
||||
assertEquals(ssQuota * 2, quotaUsage.getSpaceQuota());
|
||||
assertEquals(0, quotaUsage.getFileAndDirectoryCount());
|
||||
assertEquals(0, quotaUsage.getSpaceConsumed());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add three mount tables.
|
||||
* /dir-1 --> ns0---/dir-1 [nsQuota, ssQuota]
|
||||
|
Loading…
Reference in New Issue
Block a user