HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue configurable in DFSStripedOutputStream. Contributed by Li Bo

This commit is contained in:
boli2 2015-05-19 02:14:46 -04:00 committed by Zhe Zhang
parent 7434c44b16
commit 8c95673db4
4 changed files with 51 additions and 5 deletions

View File

@ -189,6 +189,16 @@ interface StripedRead {
int THREADPOOL_SIZE_DEFAULT = 18; int THREADPOOL_SIZE_DEFAULT = 18;
} }
/** dfs.client.write.striped configuration properties */
interface StripedWrite {
String PREFIX = Write.PREFIX + "striped.";
String MAX_SECONDS_GET_STRIPED_BLOCK_KEY = PREFIX + "max-seconds-get-striped-block";
int MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT = 90;
String MAX_SECONDS_GET_ENDED_BLOCK_KEY = PREFIX + "max-seconds-get-ended-block";
int MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT = 60;
}
/** dfs.http.client configuration properties */ /** dfs.http.client configuration properties */
interface HttpClient { interface HttpClient {
String PREFIX = "dfs.http.client."; String PREFIX = "dfs.http.client.";

View File

@ -225,3 +225,6 @@
(Yi Liu via jing9) (Yi Liu via jing9)
HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz) HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz)
HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue
configurable in DFSStripedOutputStream. (Li Bo)

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -61,11 +63,14 @@
public class DFSStripedOutputStream extends DFSOutputStream { public class DFSStripedOutputStream extends DFSOutputStream {
/** Coordinate the communication between the streamers. */ /** Coordinate the communication between the streamers. */
static class Coordinator { static class Coordinator {
private final DfsClientConf conf;
private final List<BlockingQueue<ExtendedBlock>> endBlocks; private final List<BlockingQueue<ExtendedBlock>> endBlocks;
private final List<BlockingQueue<LocatedBlock>> stripedBlocks; private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
private volatile boolean shouldLocateFollowingBlock = false; private volatile boolean shouldLocateFollowingBlock = false;
Coordinator(final int numDataBlocks, final int numAllBlocks) { Coordinator(final DfsClientConf conf, final int numDataBlocks,
final int numAllBlocks) {
this.conf = conf;
endBlocks = new ArrayList<>(numDataBlocks); endBlocks = new ArrayList<>(numDataBlocks);
for (int i = 0; i < numDataBlocks; i++) { for (int i = 0; i < numDataBlocks; i++) {
endBlocks.add(new LinkedBlockingQueue<ExtendedBlock>(1)); endBlocks.add(new LinkedBlockingQueue<ExtendedBlock>(1));
@ -91,7 +96,9 @@ void putEndBlock(int i, ExtendedBlock block) {
ExtendedBlock getEndBlock(int i) throws InterruptedIOException { ExtendedBlock getEndBlock(int i) throws InterruptedIOException {
try { try {
return endBlocks.get(i).poll(30, TimeUnit.SECONDS); return endBlocks.get(i).poll(
conf.getStripedWriteMaxSecondsGetEndedBlock(),
TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw DFSUtil.toInterruptedIOException( throw DFSUtil.toInterruptedIOException(
"getEndBlock interrupted, i=" + i, e); "getEndBlock interrupted, i=" + i, e);
@ -121,7 +128,9 @@ void putStripedBlock(int i, LocatedBlock block) throws IOException {
LocatedBlock getStripedBlock(int i) throws IOException { LocatedBlock getStripedBlock(int i) throws IOException {
final LocatedBlock lb; final LocatedBlock lb;
try { try {
lb = stripedBlocks.get(i).poll(90, TimeUnit.SECONDS); lb = stripedBlocks.get(i).poll(
conf.getStripedWriteMaxSecondsGetStripedBlock(),
TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e); throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e);
} }
@ -133,7 +142,7 @@ LocatedBlock getStripedBlock(int i) throws IOException {
} }
} }
/** Buffers for writing the data and parity cells of a strip. */ /** Buffers for writing the data and parity cells of a stripe. */
class CellBuffers { class CellBuffers {
private final ByteBuffer[] buffers; private final ByteBuffer[] buffers;
private final byte[][] checksumArrays; private final byte[][] checksumArrays;
@ -228,7 +237,7 @@ private StripedDataStreamer getLeadingStreamer() {
encoder = new RSRawEncoder(); encoder = new RSRawEncoder();
encoder.initialize(numDataBlocks, numParityBlocks, cellSize); encoder.initialize(numDataBlocks, numParityBlocks, cellSize);
coordinator = new Coordinator(numDataBlocks, numAllBlocks); coordinator = new Coordinator(dfsClient.getConf(), numDataBlocks, numAllBlocks);
try { try {
cellBuffers = new CellBuffers(numParityBlocks); cellBuffers = new CellBuffers(numParityBlocks);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {

View File

@ -103,6 +103,9 @@ public class DfsClientConf {
private final int hedgedReadThreadpoolSize; private final int hedgedReadThreadpoolSize;
private final int stripedReadThreadpoolSize; private final int stripedReadThreadpoolSize;
private final int stripedWriteMaxSecondsGetStripedBlock;
private final int stripedWriteMaxSecondsGetEndedBlock;
public DfsClientConf(Configuration conf) { public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout // The hdfsTimeout is currently the same as the ipc timeout
@ -225,6 +228,13 @@ public DfsClientConf(Configuration conf) {
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
" must be greater than 0."); " must be greater than 0.");
stripedWriteMaxSecondsGetStripedBlock = conf.getInt(
HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_KEY,
HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT);
stripedWriteMaxSecondsGetEndedBlock = conf.getInt(
HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_KEY,
HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT);
} }
private DataChecksum.Type getChecksumType(Configuration conf) { private DataChecksum.Type getChecksumType(Configuration conf) {
@ -508,6 +518,20 @@ public int getStripedReadThreadpoolSize() {
return stripedReadThreadpoolSize; return stripedReadThreadpoolSize;
} }
/**
* @return stripedWriteMaxSecondsGetStripedBlock
*/
public int getStripedWriteMaxSecondsGetStripedBlock() {
return stripedWriteMaxSecondsGetStripedBlock;
}
/**
* @return stripedWriteMaxSecondsGetEndedBlock
*/
public int getStripedWriteMaxSecondsGetEndedBlock() {
return stripedWriteMaxSecondsGetEndedBlock;
}
/** /**
* @return the shortCircuitConf * @return the shortCircuitConf
*/ */