HDFS-13346. RBF: Fix synchronization of router quota and nameservice quota.
This commit is contained in:
parent
9db9cd95bd
commit
a922b9c82c
@ -26,6 +26,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
|
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
|
||||||
@ -54,6 +55,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
@ -228,7 +230,31 @@ public AddMountTableEntryResponse addMountTableEntry(
|
|||||||
@Override
|
@Override
|
||||||
public UpdateMountTableEntryResponse updateMountTableEntry(
|
public UpdateMountTableEntryResponse updateMountTableEntry(
|
||||||
UpdateMountTableEntryRequest request) throws IOException {
|
UpdateMountTableEntryRequest request) throws IOException {
|
||||||
return getMountTableStore().updateMountTableEntry(request);
|
UpdateMountTableEntryResponse response =
|
||||||
|
getMountTableStore().updateMountTableEntry(request);
|
||||||
|
|
||||||
|
MountTable mountTable = request.getEntry();
|
||||||
|
if (mountTable != null) {
|
||||||
|
synchronizeQuota(mountTable);
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronize the quota value across mount table and subclusters.
|
||||||
|
* @param mountTable Quota set in given mount table.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void synchronizeQuota(MountTable mountTable) throws IOException {
|
||||||
|
String path = mountTable.getSourcePath();
|
||||||
|
long nsQuota = mountTable.getQuota().getQuota();
|
||||||
|
long ssQuota = mountTable.getQuota().getSpaceQuota();
|
||||||
|
|
||||||
|
if (nsQuota != HdfsConstants.QUOTA_DONT_SET
|
||||||
|
|| ssQuota != HdfsConstants.QUOTA_DONT_SET) {
|
||||||
|
this.router.getRpcServer().getQuotaModule().setQuota(path, nsQuota,
|
||||||
|
ssQuota, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -51,6 +51,8 @@
|
|||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
@ -104,6 +106,14 @@ public static void globalSetUp() throws Exception {
|
|||||||
membership.registerNamenode(
|
membership.registerNamenode(
|
||||||
createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
|
createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
|
||||||
stateStore.refreshCaches(true);
|
stateStore.refreshCaches(true);
|
||||||
|
|
||||||
|
// Mock the quota module since no real namenode is started up.
|
||||||
|
Quota quota = Mockito
|
||||||
|
.spy(routerContext.getRouter().createRpcServer().getQuotaModule());
|
||||||
|
Mockito.doNothing().when(quota).setQuota(Mockito.anyString(),
|
||||||
|
Mockito.anyLong(), Mockito.anyLong(), Mockito.any());
|
||||||
|
Whitebox.setInternalState(
|
||||||
|
routerContext.getRouter().getRpcServer(), "quotaCall", quota);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -37,9 +38,9 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
@ -49,8 +50,10 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -452,4 +455,42 @@ private List<MountTable> getMountTable(String path) throws IOException {
|
|||||||
|
|
||||||
return removeResponse.getEntries();
|
return removeResponse.getEntries();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQuotaSynchronization() throws IOException {
|
||||||
|
long updateNsQuota = 3;
|
||||||
|
long updateSsQuota = 4;
|
||||||
|
MountTable mountTable = MountTable.newInstance("/quotaSync",
|
||||||
|
Collections.singletonMap("ns0", "/"), Time.now(), Time.now());
|
||||||
|
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(1)
|
||||||
|
.spaceQuota(2).build());
|
||||||
|
// Add new mount table
|
||||||
|
addMountTable(mountTable);
|
||||||
|
|
||||||
|
// ensure the quota is not set as updated value
|
||||||
|
QuotaUsage realQuota = nnContext1.getFileSystem()
|
||||||
|
.getQuotaUsage(new Path("/"));
|
||||||
|
assertNotEquals(updateNsQuota, realQuota.getQuota());
|
||||||
|
assertNotEquals(updateSsQuota, realQuota.getSpaceQuota());
|
||||||
|
|
||||||
|
// Call periodicInvoke to ensure quota updated in quota manager
|
||||||
|
// and state store.
|
||||||
|
RouterQuotaUpdateService updateService = routerContext.getRouter()
|
||||||
|
.getQuotaCacheUpdateService();
|
||||||
|
updateService.periodicInvoke();
|
||||||
|
|
||||||
|
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(updateNsQuota)
|
||||||
|
.spaceQuota(updateSsQuota).build());
|
||||||
|
UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
|
||||||
|
.newInstance(mountTable);
|
||||||
|
RouterClient client = routerContext.getAdminClient();
|
||||||
|
MountTableManager mountTableManager = client.getMountTableManager();
|
||||||
|
mountTableManager.updateMountTableEntry(updateRequest);
|
||||||
|
|
||||||
|
// verify if the quota is updated in real path
|
||||||
|
realQuota = nnContext1.getFileSystem().getQuotaUsage(
|
||||||
|
new Path("/"));
|
||||||
|
assertEquals(updateNsQuota, realQuota.getQuota());
|
||||||
|
assertEquals(updateSsQuota, realQuota.getSpaceQuota());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user