Fixes HDFS-17181 by routing all CREATE requests to the BlockManager (#6108)
This commit is contained in:
parent
1336c362e5
commit
d1daf26b85
@ -36,6 +36,7 @@
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
import javax.servlet.ServletContext;
|
import javax.servlet.ServletContext;
|
||||||
@ -278,6 +279,9 @@ protected void queueExternalCall(ExternalCall call)
|
|||||||
namenode.queueExternalCall(call);
|
namenode.queueExternalCall(call);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Chooses a Datanode to redirect a request to.
|
||||||
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static DatanodeInfo chooseDatanode(final NameNode namenode,
|
static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||||
@ -289,17 +293,17 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
|
|||||||
}
|
}
|
||||||
final BlockManager bm = fsn.getBlockManager();
|
final BlockManager bm = fsn.getBlockManager();
|
||||||
|
|
||||||
HashSet<Node> excludes = new HashSet<Node>();
|
Set<Node> excludes = new HashSet<>();
|
||||||
if (excludeDatanodes != null) {
|
if (excludeDatanodes != null) {
|
||||||
for (String host : StringUtils
|
for (String host : StringUtils
|
||||||
.getTrimmedStringCollection(excludeDatanodes)) {
|
.getTrimmedStringCollection(excludeDatanodes)) {
|
||||||
int idx = host.indexOf(":");
|
int idx = host.indexOf(':');
|
||||||
Node excludeNode = null;
|
Node excludeNode = null;
|
||||||
if (idx != -1) {
|
if (idx == -1) {
|
||||||
|
excludeNode = bm.getDatanodeManager().getDatanodeByHost(host);
|
||||||
|
} else {
|
||||||
excludeNode = bm.getDatanodeManager().getDatanodeByXferAddr(
|
excludeNode = bm.getDatanodeManager().getDatanodeByXferAddr(
|
||||||
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1)));
|
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1)));
|
||||||
} else {
|
|
||||||
excludeNode = bm.getDatanodeManager().getDatanodeByHost(host);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (excludeNode != null) {
|
if (excludeNode != null) {
|
||||||
@ -311,25 +315,15 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (op == PutOpParam.Op.CREATE) {
|
// For these operations choose a datanode containing a replica
|
||||||
//choose a datanode near to client
|
if (op == GetOpParam.Op.OPEN
|
||||||
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
|
|
||||||
).getDatanodeByHost(remoteAddr);
|
|
||||||
if (clientNode != null) {
|
|
||||||
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
|
|
||||||
path, clientNode, excludes, blocksize);
|
|
||||||
if (storages.length > 0) {
|
|
||||||
return storages[0].getDatanodeDescriptor();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (op == GetOpParam.Op.OPEN
|
|
||||||
|| op == GetOpParam.Op.GETFILECHECKSUM
|
|| op == GetOpParam.Op.GETFILECHECKSUM
|
||||||
|| op == PostOpParam.Op.APPEND) {
|
|| op == PostOpParam.Op.APPEND) {
|
||||||
//choose a datanode containing a replica
|
|
||||||
final NamenodeProtocols np = getRPCServer(namenode);
|
final NamenodeProtocols np = getRPCServer(namenode);
|
||||||
if (status == null) {
|
if (status == null) {
|
||||||
throw new FileNotFoundException("File " + path + " not found.");
|
throw new FileNotFoundException("File " + path + " not found.");
|
||||||
}
|
}
|
||||||
|
|
||||||
final long len = status.getLen();
|
final long len = status.getLen();
|
||||||
if (op == GetOpParam.Op.OPEN) {
|
if (op == GetOpParam.Op.OPEN) {
|
||||||
if (openOffset < 0L || (openOffset >= len && len > 0)) {
|
if (openOffset < 0L || (openOffset >= len && len > 0)) {
|
||||||
@ -344,10 +338,22 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
|
|||||||
final int count = locations.locatedBlockCount();
|
final int count = locations.locatedBlockCount();
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
return bestNode(locations.get(0).getLocations(), excludes);
|
return bestNode(locations.get(0).getLocations(), excludes);
|
||||||
|
} else {
|
||||||
|
throw new IOException("Block could not be located. Path=" + path + ", offset=" + offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// All other operations don't affect a specific node so let the BlockManager pick a target
|
||||||
|
DatanodeDescriptor clientNode = bm.getDatanodeManager(
|
||||||
|
).getDatanodeByHost(remoteAddr);
|
||||||
|
|
||||||
|
DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
|
||||||
|
path, clientNode, excludes, blocksize);
|
||||||
|
if (storages.length > 0) {
|
||||||
|
return storages[0].getDatanodeDescriptor();
|
||||||
|
}
|
||||||
|
|
||||||
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
|
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
|
||||||
).chooseRandom(NodeBase.ROOT, excludes);
|
).chooseRandom(NodeBase.ROOT, excludes);
|
||||||
}
|
}
|
||||||
@ -358,13 +364,13 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
|
|||||||
* to return the first element of the node here.
|
* to return the first element of the node here.
|
||||||
*/
|
*/
|
||||||
protected static DatanodeInfo bestNode(DatanodeInfo[] nodes,
|
protected static DatanodeInfo bestNode(DatanodeInfo[] nodes,
|
||||||
HashSet<Node> excludes) throws IOException {
|
Set<Node> excludes) throws IOException {
|
||||||
for (DatanodeInfo dn: nodes) {
|
for (DatanodeInfo dn: nodes) {
|
||||||
if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
|
if (!dn.isDecommissioned() && !excludes.contains(dn)) {
|
||||||
return dn;
|
return dn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new IOException("No active nodes contain this block");
|
throw new IOException("No active and not excluded nodes contain this block");
|
||||||
}
|
}
|
||||||
|
|
||||||
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||||
|
Loading…
Reference in New Issue
Block a user