HDFS-14440. RBF: Optimize the file write process in case of multiple destinations. Contributed by Ayush Saxena.
This commit is contained in:
parent
2636a54ffd
commit
8e4267650f
@ -631,28 +631,11 @@ RemoteLocation getCreateLocation(
|
||||
RemoteLocation createLocation = locations.get(0);
|
||||
if (locations.size() > 1) {
|
||||
try {
|
||||
// Check if this file already exists in other subclusters
|
||||
LocatedBlocks existingLocation = getBlockLocations(src, 0, 1);
|
||||
if (existingLocation != null) {
|
||||
RemoteLocation existingLocation = getExistingLocation(src, locations);
|
||||
// Forward to the existing location and let the NN handle the error
|
||||
LocatedBlock existingLocationLastLocatedBlock =
|
||||
existingLocation.getLastLocatedBlock();
|
||||
if (existingLocationLastLocatedBlock == null) {
|
||||
// The block has no blocks yet, check for the meta data
|
||||
for (RemoteLocation location : locations) {
|
||||
RemoteMethod method = new RemoteMethod("getFileInfo",
|
||||
new Class<?>[] {String.class}, new RemoteParam());
|
||||
if (rpcClient.invokeSingle(location, method) != null) {
|
||||
createLocation = location;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ExtendedBlock existingLocationLastBlock =
|
||||
existingLocationLastLocatedBlock.getBlock();
|
||||
String blockPoolId = existingLocationLastBlock.getBlockPoolId();
|
||||
createLocation = getLocationForPath(src, true, blockPoolId);
|
||||
}
|
||||
if (existingLocation != null) {
|
||||
LOG.debug("{} already exists in {}.", src, existingLocation);
|
||||
createLocation = existingLocation;
|
||||
}
|
||||
} catch (FileNotFoundException fne) {
|
||||
// Ignore if the file is not found
|
||||
@ -661,6 +644,27 @@ RemoteLocation getCreateLocation(
|
||||
return createLocation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the remote location where the file exists.
|
||||
* @param src the name of file.
|
||||
* @param locations all the remote locations.
|
||||
* @return the remote location of the file if it exists, else null.
|
||||
* @throws IOException in case of any exception.
|
||||
*/
|
||||
private RemoteLocation getExistingLocation(String src,
|
||||
List<RemoteLocation> locations) throws IOException {
|
||||
RemoteMethod method = new RemoteMethod("getFileInfo",
|
||||
new Class<?>[] {String.class}, new RemoteParam());
|
||||
Map<RemoteLocation, HdfsFileStatus> results = rpcClient.invokeConcurrent(
|
||||
locations, method, false, false, HdfsFileStatus.class);
|
||||
for (RemoteLocation loc : locations) {
|
||||
if (results.get(loc) != null) {
|
||||
return loc;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public LastBlockWithStatus append(String src, final String clientName,
|
||||
final EnumSetWritable<CreateFlag> flag) throws IOException {
|
||||
|
Loading…
Reference in New Issue
Block a user