diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 7350583264..d506482190 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -106,6 +106,7 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; @@ -667,39 +668,28 @@ public void rename2(final String src, final String dst, public void concat(String trg, String[] src) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - // See if the src and target files are all in the same namespace - LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1); - if (targetBlocks == null) { - throw new IOException("Cannot locate blocks for target file - " + trg); - } - LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock(); - String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId(); - for (String source : src) { - LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1); - if (sourceBlocks == null) { - throw new IOException( - "Cannot located blocks for source file " + source); - } - String sourceBlockPoolId = - sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId(); - if (!sourceBlockPoolId.equals(targetBlockPoolId)) { - throw new IOException("Cannot concatenate source file " + source - + " because it is located in a different namespace" - + " with block pool id " + sourceBlockPoolId - + " from the target file with block pool id " - + targetBlockPoolId); - } + // Concat only effects when all files in the same namespace. + RemoteLocation targetDestination = getFileRemoteLocation(trg); + if (targetDestination == null) { + throw new IOException("Cannot find target file - " + trg); } + String targetNameService = targetDestination.getNameserviceId(); - // Find locations in the matching namespace. - final RemoteLocation targetDestination = - rpcServer.getLocationForPath(trg, true, targetBlockPoolId); String[] sourceDestinations = new String[src.length]; for (int i = 0; i < src.length; i++) { String sourceFile = src[i]; - RemoteLocation location = - rpcServer.getLocationForPath(sourceFile, true, targetBlockPoolId); - sourceDestinations[i] = location.getDest(); + RemoteLocation srcLocation = getFileRemoteLocation(sourceFile); + if (srcLocation == null) { + throw new IOException("Cannot find source file - " + sourceFile); + } + sourceDestinations[i] = srcLocation.getDest(); + + if (!targetNameService.equals(srcLocation.getNameserviceId())) { + throw new IOException("Cannot concatenate source file " + sourceFile + + " because it is located in a different namespace" + " with nameservice " + + srcLocation.getNameserviceId() + " from the target file with nameservice " + + targetNameService); + } } // Invoke RemoteMethod method = new RemoteMethod("concat", @@ -1009,6 +999,28 @@ public HdfsFileStatus getFileInfo(String src) throws IOException { return ret; } + public RemoteLocation getFileRemoteLocation(String path) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = rpcServer.getLocationsForPath(path, false, false); + if (locations.size() == 1) { + return locations.get(0); + } + RemoteLocation remoteLocation = null; + for (RemoteLocation location : locations) { + RemoteMethod method = + new RemoteMethod("getFileInfo", new Class[] {String.class}, new RemoteParam()); + HdfsFileStatus ret = rpcClient.invokeSequential(Collections.singletonList(location), method, + HdfsFileStatus.class, null); + if (ret != null) { + remoteLocation = location; + break; + } + } + + return remoteLocation; + } + @Override public boolean isFileClosed(String src) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index fe1323c4b5..217c62ff28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1697,42 +1697,6 @@ public Long getNextSPSPath() throws IOException { return nnProto.getNextSPSPath(); } - /** - * Locate the location with the matching block pool id. - * - * @param path Path to check. - * @param failIfLocked Fail the request if locked (top mount point). - * @param blockPoolId The block pool ID of the namespace to search for. - * @return Prioritized list of locations in the federated cluster. - * @throws IOException if the location for this path cannot be determined. - */ - protected RemoteLocation getLocationForPath( - String path, boolean failIfLocked, String blockPoolId) - throws IOException { - - final List locations = - getLocationsForPath(path, failIfLocked); - - String nameserviceId = null; - Set namespaces = - this.namenodeResolver.getNamespaces(); - for (FederationNamespaceInfo namespace : namespaces) { - if (namespace.getBlockPoolId().equals(blockPoolId)) { - nameserviceId = namespace.getNameserviceId(); - break; - } - } - if (nameserviceId != null) { - for (RemoteLocation location : locations) { - if (location.getNameserviceId().equals(nameserviceId)) { - return location; - } - } - } - throw new IOException( - "Cannot locate a nameservice for block pool " + blockPoolId); - } - /** * Get the possible locations of a path in the federated cluster. * During the get operation, it will do the quota verification. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index 766a035151..c84dd2ceb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -1163,6 +1163,21 @@ public void testProxyGetPreferedBlockSize() throws Exception { routerProtocol, nnProtocol, m, new Object[] {badPath}); } + private void testConcat( + String source, String target, boolean failureExpected, boolean verfiyException, String msg) { + boolean failure = false; + try { + // Concat test file with fill block length file via router + routerProtocol.concat(target, new String[] {source}); + } catch (IOException ex) { + failure = true; + if (verfiyException) { + assertExceptionContains(msg, ex); + } + } + assertEquals(failureExpected, failure); + } + private void testConcat( String source, String target, boolean failureExpected) { boolean failure = false; @@ -1224,6 +1239,27 @@ public void testProxyConcatFile() throws Exception { String badPath = "/unknownlocation/unknowndir"; compareResponses(routerProtocol, nnProtocol, m, new Object[] {badPath, new String[] {routerFile}}); + + // Test when concat trg is a empty file + createFile(routerFS, existingFile, existingFileSize); + String sameRouterEmptyFile = + cluster.getFederatedTestDirectoryForNS(sameNameservice) + + "_newemptyfile"; + createFile(routerFS, sameRouterEmptyFile, 0); + // Concat in same namespaces, succeeds + testConcat(existingFile, sameRouterEmptyFile, false); + FileStatus mergedStatus = getFileStatus(routerFS, sameRouterEmptyFile); + assertEquals(existingFileSize, mergedStatus.getLen()); + + // Test when concat srclist has some empty file, namenode will throw IOException. + String srcEmptyFile = cluster.getFederatedTestDirectoryForNS(sameNameservice) + "_srcEmptyFile"; + createFile(routerFS, srcEmptyFile, 0); + String targetFile = cluster.getFederatedTestDirectoryForNS(sameNameservice) + "_targetFile"; + createFile(routerFS, targetFile, existingFileSize); + // Concat in same namespaces, succeeds + testConcat(srcEmptyFile, targetFile, true, true, + "org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): concat: source file " + + srcEmptyFile + " is invalid or empty or underConstruction"); } @Test