HDFS-14710. RBF: Improve some RPC performance by using previous block. Contributed by xuzq.

This commit is contained in:
Inigo Goiri 2019-08-28 10:48:00 -07:00
parent c7d426daf0
commit 48cb583906
3 changed files with 126 additions and 14 deletions

View File

@ -263,7 +263,8 @@ public HdfsFileStatus create(String src, FsPermission masked,
RemoteLocation createLocation = null; RemoteLocation createLocation = null;
try { try {
createLocation = rpcServer.getCreateLocation(src); createLocation = rpcServer.getCreateLocation(src);
return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); return rpcClient.invokeSingle(createLocation, method,
HdfsFileStatus.class);
} catch (IOException ioe) { } catch (IOException ioe) {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry( final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, createLocation, locations); method, src, ioe, createLocation, locations);
@ -299,7 +300,7 @@ private static boolean isUnavailableSubclusterException(
* locations to retry in. This is used by fault tolerant mount points. * locations to retry in. This is used by fault tolerant mount points.
* @param method Method that failed and might be retried. * @param method Method that failed and might be retried.
* @param src Path where the method was invoked. * @param src Path where the method was invoked.
* @param e Exception that was triggered. * @param ioe Exception that was triggered.
* @param excludeLoc Location that failed and should be excluded. * @param excludeLoc Location that failed and should be excluded.
* @param locations All the locations to retry. * @param locations All the locations to retry.
* @return The locations where we should retry (excluding the failed ones). * @return The locations where we should retry (excluding the failed ones).
@ -441,14 +442,19 @@ public LocatedBlock addBlock(String src, String clientName,
throws IOException { throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE); rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
RemoteMethod method = new RemoteMethod("addBlock", RemoteMethod method = new RemoteMethod("addBlock",
new Class<?>[] {String.class, String.class, ExtendedBlock.class, new Class<?>[] {String.class, String.class, ExtendedBlock.class,
DatanodeInfo[].class, long.class, String[].class, DatanodeInfo[].class, long.class, String[].class,
EnumSet.class}, EnumSet.class},
new RemoteParam(), clientName, previous, excludedNodes, fileId, new RemoteParam(), clientName, previous, excludedNodes, fileId,
favoredNodes, addBlockFlags); favoredNodes, addBlockFlags);
if (previous != null) {
return rpcClient.invokeSingle(previous, method, LocatedBlock.class);
}
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
// TODO verify the excludedNodes and favoredNodes are acceptable to this NN // TODO verify the excludedNodes and favoredNodes are acceptable to this NN
return rpcClient.invokeSequential( return rpcClient.invokeSequential(
locations, method, LocatedBlock.class, null); locations, method, LocatedBlock.class, null);
@ -466,14 +472,19 @@ public LocatedBlock getAdditionalDatanode(final String src, final long fileId,
throws IOException { throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ); rpcServer.checkOperation(NameNode.OperationCategory.READ);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false);
RemoteMethod method = new RemoteMethod("getAdditionalDatanode", RemoteMethod method = new RemoteMethod("getAdditionalDatanode",
new Class<?>[] {String.class, long.class, ExtendedBlock.class, new Class<?>[] {String.class, long.class, ExtendedBlock.class,
DatanodeInfo[].class, String[].class, DatanodeInfo[].class, String[].class,
DatanodeInfo[].class, int.class, String.class}, DatanodeInfo[].class, int.class, String.class},
new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes, new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes,
numAdditionalNodes, clientName); numAdditionalNodes, clientName);
if (blk != null) {
return rpcClient.invokeSingle(blk, method, LocatedBlock.class);
}
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false);
return rpcClient.invokeSequential( return rpcClient.invokeSequential(
locations, method, LocatedBlock.class, null); locations, method, LocatedBlock.class, null);
} }
@ -495,12 +506,17 @@ public boolean complete(String src, String clientName, ExtendedBlock last,
long fileId) throws IOException { long fileId) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE); rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
RemoteMethod method = new RemoteMethod("complete", RemoteMethod method = new RemoteMethod("complete",
new Class<?>[] {String.class, String.class, ExtendedBlock.class, new Class<?>[] {String.class, String.class, ExtendedBlock.class,
long.class}, long.class},
new RemoteParam(), clientName, last, fileId); new RemoteParam(), clientName, last, fileId);
if (last != null) {
return rpcClient.invokeSingle(last, method, Boolean.class);
}
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, true);
// Complete can return true/false, so don't expect a result // Complete can return true/false, so don't expect a result
return rpcClient.invokeSequential(locations, method, Boolean.class, null); return rpcClient.invokeSequential(locations, method, Boolean.class, null);
} }
@ -513,7 +529,7 @@ public LocatedBlock updateBlockForPipeline(
RemoteMethod method = new RemoteMethod("updateBlockForPipeline", RemoteMethod method = new RemoteMethod("updateBlockForPipeline",
new Class<?>[] {ExtendedBlock.class, String.class}, new Class<?>[] {ExtendedBlock.class, String.class},
block, clientName); block, clientName);
return (LocatedBlock) rpcClient.invokeSingle(block, method); return rpcClient.invokeSingle(block, method, LocatedBlock.class);
} }
/** /**
@ -638,7 +654,7 @@ public void concat(String trg, String[] src) throws IOException {
RemoteMethod method = new RemoteMethod("concat", RemoteMethod method = new RemoteMethod("concat",
new Class<?>[] {String.class, String[].class}, new Class<?>[] {String.class, String[].class},
targetDestination.getDest(), sourceDestinations); targetDestination.getDest(), sourceDestinations);
rpcClient.invokeSingle(targetDestination, method); rpcClient.invokeSingle(targetDestination, method, Void.class);
} }
@Override @Override
@ -705,7 +721,7 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
final RemoteLocation firstLocation = locations.get(0); final RemoteLocation firstLocation = locations.get(0);
try { try {
return (boolean) rpcClient.invokeSingle(firstLocation, method); return rpcClient.invokeSingle(firstLocation, method, Boolean.class);
} catch (IOException ioe) { } catch (IOException ioe) {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry( final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, firstLocation, locations); method, src, ioe, firstLocation, locations);

View File

@ -731,6 +731,27 @@ public <T> T invokeSingle(final String nsId, RemoteMethod method,
return ret; return ret;
} }
/**
* Invokes a remote method against the specified extendedBlock.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param <T> The type of the remote method return.
* @param extendedBlock Target extendedBlock for the method.
* @param method The remote method and parameters to invoke.
* @param clazz Class for the return type.
* @return The result of invoking the method.
* @throws IOException If the invoke generated an error.
*/
public <T> T invokeSingle(final ExtendedBlock extendedBlock,
RemoteMethod method, Class<T> clazz) throws IOException {
String nsId = getNameserviceForBlockPoolId(extendedBlock.getBlockPoolId());
@SuppressWarnings("unchecked")
T ret = (T)invokeSingle(nsId, method);
return ret;
}
/** /**
* Invokes a single proxy call for a single location. * Invokes a single proxy call for a single location.
* *
@ -742,10 +763,12 @@ public <T> T invokeSingle(final String nsId, RemoteMethod method,
* @return The result of invoking the method if successful. * @return The result of invoking the method if successful.
* @throws IOException If the invoke generated an error. * @throws IOException If the invoke generated an error.
*/ */
public Object invokeSingle(final RemoteLocationContext location, public <T> T invokeSingle(final RemoteLocationContext location,
RemoteMethod remoteMethod) throws IOException { RemoteMethod remoteMethod, Class<T> clazz) throws IOException {
List<RemoteLocationContext> locations = Collections.singletonList(location); List<RemoteLocationContext> locations = Collections.singletonList(location);
return invokeSequential(locations, remoteMethod); @SuppressWarnings("unchecked")
T ret = (T)invokeSequential(locations, remoteMethod);
return ret;
} }
/** /**

View File

@ -21,6 +21,7 @@
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -33,30 +34,38 @@
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
@ -233,6 +242,70 @@ public void testProxyRenameFiles() throws IOException, InterruptedException {
testRename2(getRouterContext(), filename1, renamedFile, false); testRename2(getRouterContext(), filename1, renamedFile, false);
} }
/**
* Verify some rpc with previous block not null.
*/
@Test
public void testPreviousBlockNotNull()
throws IOException, URISyntaxException {
final FederationRPCMetrics metrics = getRouterContext().
getRouter().getRpcServer().getRPCMetrics();
final ClientProtocol clientProtocol = getRouterProtocol();
final EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE,
CreateFlag.OVERWRITE);
final String clientName = getRouterContext().getClient().getClientName();
final String testPath = "/getAdditionalData/test.txt";
final String ns1 = getCluster().getNameservices().get(1);
final FileSystem fileSystem1 = getCluster().
getNamenode(ns1, null).getFileSystem();
try {
// Create the test file in NS1.
createFile(fileSystem1, testPath, 32);
// Crate the test file via Router to get file status.
HdfsFileStatus status = clientProtocol.create(
testPath, new FsPermission("777"), clientName,
new EnumSetWritable<>(createFlag), true, (short) 1,
(long) 1024, CryptoProtocolVersion.supported(), null, null);
long proxyNumCreate = metrics.getProcessingOps();
// Add a block via router and previous block is null.
LocatedBlock blockOne = clientProtocol.addBlock(
testPath, clientName, null, null,
status.getFileId(), null, null);
assertNotNull(blockOne);
long proxyNumAddBlock = metrics.getProcessingOps();
assertEquals(2, proxyNumAddBlock - proxyNumCreate);
// Add a block via router and previous block is not null.
LocatedBlock blockTwo = clientProtocol.addBlock(
testPath, clientName, blockOne.getBlock(), null,
status.getFileId(), null, null);
assertNotNull(blockTwo);
long proxyNumAddBlock2 = metrics.getProcessingOps();
assertEquals(1, proxyNumAddBlock2 - proxyNumAddBlock);
// Get additionalDatanode via router and block is not null.
DatanodeInfo[] exclusions = new DatanodeInfo[0];
LocatedBlock newBlock = clientProtocol.getAdditionalDatanode(
testPath, status.getFileId(), blockTwo.getBlock(),
blockTwo.getLocations(), blockTwo.getStorageIDs(), exclusions,
1, clientName);
assertNotNull(newBlock);
long proxyNumAdditionalDatanode = metrics.getProcessingOps();
assertEquals(1, proxyNumAdditionalDatanode - proxyNumAddBlock2);
// Complete the file via router and last block is not null.
clientProtocol.complete(testPath, clientName,
newBlock.getBlock(), status.getFileId());
long proxyNumComplete = metrics.getProcessingOps();
assertEquals(1, proxyNumComplete - proxyNumAdditionalDatanode);
} finally {
clientProtocol.delete(testPath, true);
}
}
/** /**
* Test recoverLease when the result is false. * Test recoverLease when the result is false.
*/ */