HDFS-13448. HDFS Block Placement - Ignore Locality for First Block Replica

(Contributed by BELUGA BEHR via Daniel Templeton)

Change-Id: I965d1cfa642ad24296038b83e3d5c9983545267d
This commit is contained in:
Daniel Templeton 2018-07-24 15:34:19 -07:00
parent 6bec03cfc8
commit 849c45db18
8 changed files with 134 additions and 14 deletions

View File

@ -116,7 +116,14 @@ public enum CreateFlag {
* Enforce the file to be a replicated file, no matter what its parent
* directory's replication or erasure coding policy is.
*/
SHOULD_REPLICATE((short) 0x80);
SHOULD_REPLICATE((short) 0x80),
/**
* Advise that the first block replica NOT take into account DataNode
* locality. The first block replica should be placed randomly within the
* cluster. Subsequent block replicas should follow DataNode locality rules.
*/
IGNORE_CLIENT_LOCALITY((short) 0x100);
private final short mode;

View File

@ -36,7 +36,16 @@ public enum AddBlockFlag {
*
* @see CreateFlag#NO_LOCAL_WRITE
*/
NO_LOCAL_WRITE((short) 0x01);
NO_LOCAL_WRITE((short) 0x01),
/**
* Advise that the first block replica NOT take into account DataNode
* locality. The first block replica should be placed randomly within the
* cluster. Subsequent block replicas should follow DataNode locality rules.
*
* @see CreateFlag#IGNORE_CLIENT_LOCALITY
*/
IGNORE_CLIENT_LOCALITY((short) 0x02);
private final short mode;

View File

@ -201,6 +201,9 @@ private DFSOutputStream(DFSClient dfsClient, String src,
if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) {
this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE);
}
if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) {
this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
}
if (progress != null) {
DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
+"{}", src);

View File

@ -3205,6 +3205,17 @@ public HdfsDataOutputStreamBuilder replicate() {
return this;
}
/**
* Advise that the first block replica be written without regard to the
* client locality.
*
* @see CreateFlag for the details.
*/
public HdfsDataOutputStreamBuilder ignoreClientLocality() {
getFlags().add(CreateFlag.IGNORE_CLIENT_LOCALITY);
return this;
}
@VisibleForTesting
@Override
protected EnumSet<CreateFlag> getFlags() {

View File

@ -167,6 +167,7 @@ message AbandonBlockResponseProto { // void response
enum AddBlockFlagProto {
NO_LOCAL_WRITE = 1; // avoid writing to local node.
IGNORE_CLIENT_LOCALITY = 2; // write to a random node
}
message AddBlockRequestProto {

View File

@ -280,7 +280,9 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
if (avoidLocalNode) {
results = new ArrayList<>(chosenStorage);
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
excludedNodeCopy.add(writer);
if (writer != null) {
excludedNodeCopy.add(writer);
}
localNode = chooseTarget(numOfReplicas, writer,
excludedNodeCopy, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storagePolicy,

View File

@ -269,19 +269,27 @@ static DatanodeStorageInfo[] chooseTargetForNewBlock(
BlockManager bm, String src, DatanodeInfo[] excludedNodes,
String[] favoredNodes, EnumSet<AddBlockFlag> flags,
ValidateAddBlockResult r) throws IOException {
Node clientNode = bm.getDatanodeManager()
.getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
clientNode = getClientNode(bm, r.clientMachine);
Node clientNode = null;
boolean ignoreClientLocality = (flags != null
&& flags.contains(AddBlockFlag.IGNORE_CLIENT_LOCALITY));
// If client locality is ignored, clientNode remains 'null' to indicate
if (!ignoreClientLocality) {
clientNode = bm.getDatanodeManager().getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
clientNode = getClientNode(bm, r.clientMachine);
}
}
Set<Node> excludedNodesSet = null;
if (excludedNodes != null) {
excludedNodesSet = new HashSet<>(excludedNodes.length);
Collections.addAll(excludedNodesSet, excludedNodes);
}
List<String> favoredNodesList = (favoredNodes == null) ? null
: Arrays.asList(favoredNodes);
Set<Node> excludedNodesSet =
(excludedNodes == null) ? new HashSet<>()
: new HashSet<>(Arrays.asList(excludedNodes));
List<String> favoredNodesList =
(favoredNodes == null) ? Collections.emptyList()
: Arrays.asList(favoredNodes);
// choose targets for the new block to be allocated.
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
excludedNodesSet, r.blockSize,

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyByte;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anySet;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.ValidateAddBlockResult;
import org.apache.hadoop.net.Node;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestFSDirWriteFileOp {
@Test
@SuppressWarnings("unchecked")
public void testIgnoreClientLocality() throws IOException {
ValidateAddBlockResult addBlockResult =
new ValidateAddBlockResult(1024L, 3, (byte) 0x01, null, null, null);
EnumSet<AddBlockFlag> addBlockFlags =
EnumSet.of(AddBlockFlag.IGNORE_CLIENT_LOCALITY);
BlockManager bmMock = mock(BlockManager.class);
ArgumentCaptor<Node> nodeCaptor = ArgumentCaptor.forClass(Node.class);
when(bmMock.chooseTarget4NewBlock(anyString(), anyInt(), any(), anySet(),
anyLong(), anyList(), anyByte(), any(), any(), any())).thenReturn(null);
FSDirWriteFileOp.chooseTargetForNewBlock(bmMock, "localhost", null, null,
addBlockFlags, addBlockResult);
// There should be no other interactions with the block manager when the
// IGNORE_CLIENT_LOCALITY is passed in because there is no need to discover
// the local node requesting the new block
verify(bmMock, times(1)).chooseTarget4NewBlock(anyString(), anyInt(),
nodeCaptor.capture(), anySet(), anyLong(), anyList(), anyByte(), any(),
any(), any());
verifyNoMoreInteractions(bmMock);
assertNull(
"Source node was assigned a value. Expected 'null' value because "
+ "chooseTarget was flagged to ignore source node locality",
nodeCaptor.getValue());
}
}