HDFS-16359. RBF: RouterRpcServer#invokeAtAvailableNs does not take effect when retrying (#3731). Contributed by tomscut.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
b34dcb5b3a
commit
196935a8d2
@ -696,7 +696,7 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
|
|||||||
ioe.getClass().getSimpleName());
|
ioe.getClass().getSimpleName());
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
Set<FederationNamespaceInfo> nssWithoutFailed = getNameSpaceInfo(nsId);
|
Set<FederationNamespaceInfo> nssWithoutFailed = getNameSpaceInfo(nss, nsId);
|
||||||
return invokeOnNs(method, clazz, ioe, nssWithoutFailed);
|
return invokeOnNs(method, clazz, ioe, nssWithoutFailed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -722,13 +722,15 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Get set of namespace info's removing the already invoked namespaceinfo.
|
* Get set of namespace info's removing the already invoked namespaceinfo.
|
||||||
* @param nsId already invoked namespace id
|
* @param nss List of namespaces in the federation.
|
||||||
|
* @param nsId Already invoked namespace id.
|
||||||
* @return List of name spaces in the federation on
|
* @return List of name spaces in the federation on
|
||||||
* removing the already invoked namespaceinfo.
|
* removing the already invoked namespaceinfo.
|
||||||
*/
|
*/
|
||||||
private Set<FederationNamespaceInfo> getNameSpaceInfo(String nsId) {
|
private static Set<FederationNamespaceInfo> getNameSpaceInfo(
|
||||||
|
final Set<FederationNamespaceInfo> nss, final String nsId) {
|
||||||
Set<FederationNamespaceInfo> namespaceInfos = new HashSet<>();
|
Set<FederationNamespaceInfo> namespaceInfos = new HashSet<>();
|
||||||
for (FederationNamespaceInfo ns : namespaceInfos) {
|
for (FederationNamespaceInfo ns : nss) {
|
||||||
if (!nsId.equals(ns.getNameserviceId())) {
|
if (!nsId.equals(ns.getNameserviceId())) {
|
||||||
namespaceInfos.add(ns);
|
namespaceInfos.add(ns);
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
@ -39,6 +40,7 @@
|
|||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FsServerDefaults;
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.AclEntry;
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
@ -643,6 +645,40 @@ public void testContentSummaryMultipleDestWithMaxValue()
|
|||||||
assertEquals(ssQuota, cs.getSpaceQuota());
|
assertEquals(ssQuota, cs.getSpaceQuota());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test RouterRpcServer#invokeAtAvailableNs on mount point with multiple destinations
|
||||||
|
* and making a one of the destination's subcluster unavailable.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testInvokeAtAvailableNs() throws IOException {
|
||||||
|
// Create a mount point with multiple destinations.
|
||||||
|
Path path = new Path("/testInvokeAtAvailableNs");
|
||||||
|
Map<String, String> destMap = new HashMap<>();
|
||||||
|
destMap.put("ns0", "/testInvokeAtAvailableNs");
|
||||||
|
destMap.put("ns1", "/testInvokeAtAvailableNs");
|
||||||
|
nnFs0.mkdirs(path);
|
||||||
|
nnFs1.mkdirs(path);
|
||||||
|
MountTable addEntry =
|
||||||
|
MountTable.newInstance("/testInvokeAtAvailableNs", destMap);
|
||||||
|
addEntry.setQuota(new RouterQuotaUsage.Builder().build());
|
||||||
|
addEntry.setDestOrder(DestinationOrder.RANDOM);
|
||||||
|
addEntry.setFaultTolerant(true);
|
||||||
|
assertTrue(addMountTable(addEntry));
|
||||||
|
|
||||||
|
// Make one subcluster unavailable.
|
||||||
|
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||||
|
dfsCluster.shutdownNameNode(0);
|
||||||
|
try {
|
||||||
|
// Verify that #invokeAtAvailableNs works by calling #getServerDefaults.
|
||||||
|
RemoteMethod method = new RemoteMethod("getServerDefaults");
|
||||||
|
FsServerDefaults serverDefaults =
|
||||||
|
rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class);
|
||||||
|
assertNotNull(serverDefaults);
|
||||||
|
} finally {
|
||||||
|
dfsCluster.restartNameNode(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test write on mount point with multiple destinations
|
* Test write on mount point with multiple destinations
|
||||||
* and making a one of the destination's subcluster unavailable.
|
* and making a one of the destination's subcluster unavailable.
|
||||||
|
Loading…
Reference in New Issue
Block a user