HDFS-13882. Set a maximum delay for retrying locateFollowingBlock. Contributed by Kitti Nanasi.
Signed-off-by: Xiao Chen <xiao@apache.org>
This commit is contained in:
parent
90552b1ea8
commit
10185d9a77
@ -942,6 +942,7 @@ protected void completeFile(ExtendedBlock last) throws IOException {
|
|||||||
long localstart = Time.monotonicNow();
|
long localstart = Time.monotonicNow();
|
||||||
final DfsClientConf conf = dfsClient.getConf();
|
final DfsClientConf conf = dfsClient.getConf();
|
||||||
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
|
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
|
||||||
|
long maxSleepTime = conf.getBlockWriteLocateFollowingMaxDelayMs();
|
||||||
boolean fileComplete = false;
|
boolean fileComplete = false;
|
||||||
int retries = conf.getNumBlockWriteLocateFollowingRetry();
|
int retries = conf.getNumBlockWriteLocateFollowingRetry();
|
||||||
while (!fileComplete) {
|
while (!fileComplete) {
|
||||||
@ -965,7 +966,7 @@ protected void completeFile(ExtendedBlock last) throws IOException {
|
|||||||
}
|
}
|
||||||
retries--;
|
retries--;
|
||||||
Thread.sleep(sleeptime);
|
Thread.sleep(sleeptime);
|
||||||
sleeptime *= 2;
|
sleeptime = calculateDelayForNextRetry(sleeptime, maxSleepTime);
|
||||||
if (Time.monotonicNow() - localstart > 5000) {
|
if (Time.monotonicNow() - localstart > 5000) {
|
||||||
DFSClient.LOG.info("Could not complete " + src + " retrying...");
|
DFSClient.LOG.info("Could not complete " + src + " retrying...");
|
||||||
}
|
}
|
||||||
@ -1075,6 +1076,7 @@ static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
|
|||||||
final DfsClientConf conf = dfsClient.getConf();
|
final DfsClientConf conf = dfsClient.getConf();
|
||||||
int retries = conf.getNumBlockWriteLocateFollowingRetry();
|
int retries = conf.getNumBlockWriteLocateFollowingRetry();
|
||||||
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
|
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
|
||||||
|
long maxSleepTime = conf.getBlockWriteLocateFollowingMaxDelayMs();
|
||||||
long localstart = Time.monotonicNow();
|
long localstart = Time.monotonicNow();
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
@ -1106,7 +1108,7 @@ static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
|
|||||||
LOG.warn("NotReplicatedYetException sleeping " + src
|
LOG.warn("NotReplicatedYetException sleeping " + src
|
||||||
+ " retries left " + retries);
|
+ " retries left " + retries);
|
||||||
Thread.sleep(sleeptime);
|
Thread.sleep(sleeptime);
|
||||||
sleeptime *= 2;
|
sleeptime = calculateDelayForNextRetry(sleeptime, maxSleepTime);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
LOG.warn("Caught exception", ie);
|
LOG.warn("Caught exception", ie);
|
||||||
}
|
}
|
||||||
@ -1117,4 +1119,19 @@ static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the delay for the next retry.
|
||||||
|
*
|
||||||
|
* The delay is increased exponentially until the maximum delay is reached.
|
||||||
|
*
|
||||||
|
* @param previousDelay delay for the previous retry
|
||||||
|
* @param maxDelay maximum delay
|
||||||
|
* @return the minimum of the double of the previous delay
|
||||||
|
* and the maximum delay
|
||||||
|
*/
|
||||||
|
private static long calculateDelayForNextRetry(long previousDelay,
|
||||||
|
long maxDelay) {
|
||||||
|
return Math.min(previousDelay * 2, maxDelay);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -322,6 +322,9 @@ interface BlockWrite {
|
|||||||
String LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY =
|
String LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY =
|
||||||
PREFIX + "locateFollowingBlock.initial.delay.ms";
|
PREFIX + "locateFollowingBlock.initial.delay.ms";
|
||||||
int LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT = 400;
|
int LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT = 400;
|
||||||
|
String LOCATEFOLLOWINGBLOCK_MAX_DELAY_MS_KEY =
|
||||||
|
PREFIX + "locateFollowingBlock.max.delay.ms";
|
||||||
|
int LOCATEFOLLOWINGBLOCK_MAX_DELAY_MS_DEFAULT = 60000;
|
||||||
|
|
||||||
interface ReplaceDatanodeOnFailure {
|
interface ReplaceDatanodeOnFailure {
|
||||||
String PREFIX = BlockWrite.PREFIX + "replace-datanode-on-failure.";
|
String PREFIX = BlockWrite.PREFIX + "replace-datanode-on-failure.";
|
||||||
|
@ -122,6 +122,7 @@ public class DfsClientConf {
|
|||||||
private final int numBlockWriteRetry;
|
private final int numBlockWriteRetry;
|
||||||
private final int numBlockWriteLocateFollowingRetry;
|
private final int numBlockWriteLocateFollowingRetry;
|
||||||
private final int blockWriteLocateFollowingInitialDelayMs;
|
private final int blockWriteLocateFollowingInitialDelayMs;
|
||||||
|
private final int blockWriteLocateFollowingMaxDelayMs;
|
||||||
private final long defaultBlockSize;
|
private final long defaultBlockSize;
|
||||||
private final long prefetchSize;
|
private final long prefetchSize;
|
||||||
private final short defaultReplication;
|
private final short defaultReplication;
|
||||||
@ -237,6 +238,9 @@ public DfsClientConf(Configuration conf) {
|
|||||||
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
|
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
|
||||||
BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
|
BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
|
||||||
BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
|
BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT);
|
||||||
|
blockWriteLocateFollowingMaxDelayMs = conf.getInt(
|
||||||
|
BlockWrite.LOCATEFOLLOWINGBLOCK_MAX_DELAY_MS_KEY,
|
||||||
|
BlockWrite.LOCATEFOLLOWINGBLOCK_MAX_DELAY_MS_DEFAULT);
|
||||||
uMask = FsPermission.getUMask(conf);
|
uMask = FsPermission.getUMask(conf);
|
||||||
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
|
||||||
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
||||||
@ -349,6 +353,10 @@ public int getBlockWriteLocateFollowingInitialDelayMs() {
|
|||||||
return blockWriteLocateFollowingInitialDelayMs;
|
return blockWriteLocateFollowingInitialDelayMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getBlockWriteLocateFollowingMaxDelayMs() {
|
||||||
|
return blockWriteLocateFollowingMaxDelayMs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the hdfsTimeout
|
* @return the hdfsTimeout
|
||||||
*/
|
*/
|
||||||
|
@ -3135,7 +3135,18 @@
|
|||||||
<name>dfs.client.block.write.locateFollowingBlock.initial.delay.ms</name>
|
<name>dfs.client.block.write.locateFollowingBlock.initial.delay.ms</name>
|
||||||
<value>400</value>
|
<value>400</value>
|
||||||
<description>The initial delay (unit is ms) for locateFollowingBlock,
|
<description>The initial delay (unit is ms) for locateFollowingBlock,
|
||||||
the delay time will increase exponentially(double) for each retry.
|
the delay time will increase exponentially(double) for each retry
|
||||||
|
until dfs.client.block.write.locateFollowingBlock.max.delay.ms is reached,
|
||||||
|
after that the delay for each retry will be
|
||||||
|
dfs.client.block.write.locateFollowingBlock.max.delay.ms.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.client.block.write.locateFollowingBlock.max.delay.ms</name>
|
||||||
|
<value>60000</value>
|
||||||
|
<description>
|
||||||
|
The maximum delay (unit is ms) before retrying locateFollowingBlock.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -1229,27 +1229,54 @@ static void parseMultipleLinearRandomRetry(String expected, String s) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests default configuration values and configuration setting
|
||||||
|
* of locate following block delays and number of retries.
|
||||||
|
*
|
||||||
|
* Configuration values tested:
|
||||||
|
* - dfs.client.block.write.locateFollowingBlock.initial.delay.ms
|
||||||
|
* - dfs.client.block.write.locateFollowingBlock.max.delay.ms
|
||||||
|
* - dfs.client.block.write.locateFollowingBlock.retries
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDFSClientConfigurationLocateFollowingBlockInitialDelay()
|
public void testDFSClientConfigurationLocateFollowingBlock()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
// test if HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY
|
|
||||||
// is not configured, verify DFSClient uses the default value 400.
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
final int initialDelayTestValue = 1000;
|
||||||
|
final int maxDelayTestValue = 35000;
|
||||||
|
final int retryTestValue = 7;
|
||||||
|
|
||||||
|
final int defaultInitialDelay = 400;
|
||||||
|
final int defaultMaxDelay = 60000;
|
||||||
|
final int defultRetry = 5;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
NamenodeProtocols nn = cluster.getNameNodeRpc();
|
NamenodeProtocols nn = cluster.getNameNodeRpc();
|
||||||
DFSClient client = new DFSClient(null, nn, conf, null);
|
DFSClient client = new DFSClient(null, nn, conf, null);
|
||||||
assertEquals(client.getConf().
|
assertEquals(defaultInitialDelay, client.getConf().
|
||||||
getBlockWriteLocateFollowingInitialDelayMs(), 400);
|
getBlockWriteLocateFollowingInitialDelayMs());
|
||||||
|
assertEquals(defaultMaxDelay, client.getConf().
|
||||||
|
getBlockWriteLocateFollowingMaxDelayMs());
|
||||||
|
assertEquals(defultRetry, client.getConf().
|
||||||
|
getNumBlockWriteLocateFollowingRetry());
|
||||||
|
|
||||||
// change HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
|
|
||||||
// verify DFSClient uses the configured value 1000.
|
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
|
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY,
|
||||||
1000);
|
initialDelayTestValue);
|
||||||
|
conf.setInt(
|
||||||
|
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_MAX_DELAY_MS_KEY,
|
||||||
|
maxDelayTestValue);
|
||||||
|
conf.setInt(
|
||||||
|
HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
|
||||||
|
retryTestValue);
|
||||||
client = new DFSClient(null, nn, conf, null);
|
client = new DFSClient(null, nn, conf, null);
|
||||||
assertEquals(client.getConf().
|
assertEquals(initialDelayTestValue, client.getConf().
|
||||||
getBlockWriteLocateFollowingInitialDelayMs(), 1000);
|
getBlockWriteLocateFollowingInitialDelayMs());
|
||||||
|
assertEquals(maxDelayTestValue, client.getConf().
|
||||||
|
getBlockWriteLocateFollowingMaxDelayMs());
|
||||||
|
assertEquals(retryTestValue, client.getConf().
|
||||||
|
getNumBlockWriteLocateFollowingRetry());
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,7 @@ public void initializeMemberVariables() {
|
|||||||
configurationClasses = new Class[] { HdfsClientConfigKeys.class,
|
configurationClasses = new Class[] { HdfsClientConfigKeys.class,
|
||||||
HdfsClientConfigKeys.Failover.class,
|
HdfsClientConfigKeys.Failover.class,
|
||||||
HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
|
HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
|
||||||
|
HdfsClientConfigKeys.BlockWrite.class,
|
||||||
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };
|
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };
|
||||||
|
|
||||||
// Set error modes
|
// Set error modes
|
||||||
|
Loading…
Reference in New Issue
Block a user