HDFS-15535. RBF: Fix Namespace path to snapshot path resolution for snapshot API. Contributed by Ayush Saxena.
This commit is contained in:
parent
9b9f7ea16a
commit
4813a37023
@ -849,6 +849,45 @@ public <T> T invokeSequential(
|
|||||||
final List<? extends RemoteLocationContext> locations,
|
final List<? extends RemoteLocationContext> locations,
|
||||||
final RemoteMethod remoteMethod, Class<T> expectedResultClass,
|
final RemoteMethod remoteMethod, Class<T> expectedResultClass,
|
||||||
Object expectedResultValue) throws IOException {
|
Object expectedResultValue) throws IOException {
|
||||||
|
return (T) invokeSequential(remoteMethod, locations, expectedResultClass,
|
||||||
|
expectedResultValue).getResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invokes sequential proxy calls to different locations. Continues to invoke
|
||||||
|
* calls until the success condition is met, or until all locations have been
|
||||||
|
* attempted.
|
||||||
|
*
|
||||||
|
* The success condition may be specified by:
|
||||||
|
* <ul>
|
||||||
|
* <li>An expected result class
|
||||||
|
* <li>An expected result value
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* If no expected result class/values are specified, the success condition is
|
||||||
|
* a call that does not throw a remote exception.
|
||||||
|
*
|
||||||
|
* This returns RemoteResult, which contains the invoked location as well
|
||||||
|
* as the result.
|
||||||
|
*
|
||||||
|
* @param <R> The type of the remote location.
|
||||||
|
* @param <T> The type of the remote method return.
|
||||||
|
* @param remoteMethod The remote method and parameters to invoke.
|
||||||
|
* @param locations List of locations/nameservices to call concurrently.
|
||||||
|
* @param expectedResultClass In order to be considered a positive result, the
|
||||||
|
* return type must be of this class.
|
||||||
|
* @param expectedResultValue In order to be considered a positive result, the
|
||||||
|
* return value must equal the value of this object.
|
||||||
|
* @return The result of the first successful call, or if no calls are
|
||||||
|
* successful, the result of the first RPC call executed, along with
|
||||||
|
* the invoked location in form of RemoteResult.
|
||||||
|
* @throws IOException if the success condition is not met, return the first
|
||||||
|
* remote exception generated.
|
||||||
|
*/
|
||||||
|
public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
|
||||||
|
final RemoteMethod remoteMethod, final List<R> locations,
|
||||||
|
Class<T> expectedResultClass, Object expectedResultValue)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
|
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
|
||||||
final Method m = remoteMethod.getMethod();
|
final Method m = remoteMethod.getMethod();
|
||||||
@ -867,9 +906,9 @@ public <T> T invokeSequential(
|
|||||||
if (isExpectedClass(expectedResultClass, result) &&
|
if (isExpectedClass(expectedResultClass, result) &&
|
||||||
isExpectedValue(expectedResultValue, result)) {
|
isExpectedValue(expectedResultValue, result)) {
|
||||||
// Valid result, stop here
|
// Valid result, stop here
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked") R location = (R) loc;
|
||||||
T ret = (T)result;
|
@SuppressWarnings("unchecked") T ret = (T) result;
|
||||||
return ret;
|
return new RemoteResult<>(location, ret);
|
||||||
}
|
}
|
||||||
if (firstResult == null) {
|
if (firstResult == null) {
|
||||||
firstResult = result;
|
firstResult = result;
|
||||||
@ -907,9 +946,8 @@ public <T> T invokeSequential(
|
|||||||
throw thrownExceptions.get(0);
|
throw thrownExceptions.get(0);
|
||||||
}
|
}
|
||||||
// Return the first result, whether it is the value or not
|
// Return the first result, whether it is the value or not
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked") T ret = (T) firstResult;
|
||||||
T ret = (T)firstResult;
|
return new RemoteResult<>(locations.get(0), ret);
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,10 +104,12 @@ public String createSnapshot(String snapshotRoot, String snapshotName)
|
|||||||
result = firstelement.getValue();
|
result = firstelement.getValue();
|
||||||
result = result.replaceFirst(loc.getDest(), loc.getSrc());
|
result = result.replaceFirst(loc.getDest(), loc.getSrc());
|
||||||
} else {
|
} else {
|
||||||
result = rpcClient.invokeSequential(
|
RemoteResult<RemoteLocation, String> response =
|
||||||
locations, method, String.class, null);
|
rpcClient.invokeSequential(method, locations, String.class, null);
|
||||||
RemoteLocation loc = locations.get(0);
|
RemoteLocation loc = response.getLocation();
|
||||||
result = result.replaceFirst(loc.getDest(), loc.getSrc());
|
String invokedResult = response.getResult();
|
||||||
|
result = invokedResult
|
||||||
|
.replaceFirst(loc.getDest(), loc.getSrc());
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@ -180,9 +182,11 @@ public SnapshotStatus[] getSnapshotListing(String snapshotRoot)
|
|||||||
s.setParentFullPath(DFSUtil.string2Bytes(mountPath));
|
s.setParentFullPath(DFSUtil.string2Bytes(mountPath));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
response = rpcClient.invokeSequential(
|
RemoteResult<RemoteLocation, SnapshotStatus[]> invokedResponse = rpcClient
|
||||||
locations, remoteMethod, SnapshotStatus[].class, null);
|
.invokeSequential(remoteMethod, locations, SnapshotStatus[].class,
|
||||||
RemoteLocation loc = locations.get(0);
|
null);
|
||||||
|
RemoteLocation loc = invokedResponse.getLocation();
|
||||||
|
response = invokedResponse.getResult();
|
||||||
for (SnapshotStatus s : response) {
|
for (SnapshotStatus s : response) {
|
||||||
String mountPath = DFSUtil.bytes2String(s.getParentFullPath()).
|
String mountPath = DFSUtil.bytes2String(s.getParentFullPath()).
|
||||||
replaceFirst(loc.getDest(), loc.getSrc());
|
replaceFirst(loc.getDest(), loc.getSrc());
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
@ -492,6 +493,38 @@ public void testIsMultiDestDir() throws Exception {
|
|||||||
assertFalse(client.isMultiDestDirectory("/mount/dir"));
|
assertFalse(client.isMultiDestDirectory("/mount/dir"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies the snapshot location returned after snapshot operations is in
|
||||||
|
* accordance to the mount path.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSnapshotPathResolution() throws Exception {
|
||||||
|
// Create a mount entry with non isPathAll order, so as to call
|
||||||
|
// invokeSequential.
|
||||||
|
Map<String, String> destMap = new HashMap<>();
|
||||||
|
destMap.put("ns0", "/tmp_ns0");
|
||||||
|
destMap.put("ns1", "/tmp_ns1");
|
||||||
|
nnFs0.mkdirs(new Path("/tmp_ns0"));
|
||||||
|
nnFs1.mkdirs(new Path("/tmp_ns1"));
|
||||||
|
MountTable addEntry = MountTable.newInstance("/mountSnap", destMap);
|
||||||
|
addEntry.setDestOrder(DestinationOrder.HASH);
|
||||||
|
assertTrue(addMountTable(addEntry));
|
||||||
|
// Create the actual directory in the destination second in sequence of
|
||||||
|
// invokeSequential.
|
||||||
|
nnFs0.mkdirs(new Path("/tmp_ns0/snapDir"));
|
||||||
|
Path snapDir = new Path("/mountSnap/snapDir");
|
||||||
|
Path snapshotPath = new Path("/mountSnap/snapDir/.snapshot/snap");
|
||||||
|
routerFs.allowSnapshot(snapDir);
|
||||||
|
// Verify the snapshot path returned after createSnapshot is as per mount
|
||||||
|
// path.
|
||||||
|
Path snapshot = routerFs.createSnapshot(snapDir, "snap");
|
||||||
|
assertEquals(snapshotPath, snapshot);
|
||||||
|
// Verify the snapshot path returned as part of snapshotListing is as per
|
||||||
|
// mount path.
|
||||||
|
SnapshotStatus[] snapshots = routerFs.getSnapshotListing(snapDir);
|
||||||
|
assertEquals(snapshotPath, snapshots[0].getFullPath());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRenameMultipleDestDirectories() throws Exception {
|
public void testRenameMultipleDestDirectories() throws Exception {
|
||||||
// Test renaming directories using rename API.
|
// Test renaming directories using rename API.
|
||||||
|
Loading…
Reference in New Issue
Block a user