HDFS-7270. Add congestion signaling capability to DataNode write protocol. Contributed by Haohui Mai.
This commit is contained in:
parent
d27439f839
commit
c4980a2f34
@ -571,6 +571,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
|
|
||||||
HDFS-7712. Switch blockStateChangeLog to use slf4j. (wang)
|
HDFS-7712. Switch blockStateChangeLog to use slf4j. (wang)
|
||||||
|
|
||||||
|
HDFS-7270. Add congestion signaling capability to DataNode write protocol.
|
||||||
|
(wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
@ -764,4 +764,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String NNTOP_WINDOWS_MINUTES_KEY =
|
public static final String NNTOP_WINDOWS_MINUTES_KEY =
|
||||||
"dfs.namenode.top.windows.minutes";
|
"dfs.namenode.top.windows.minutes";
|
||||||
public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"};
|
public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"};
|
||||||
|
public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn";
|
||||||
|
public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false;
|
||||||
}
|
}
|
||||||
|
@ -892,7 +892,8 @@ public void run() {
|
|||||||
long seqno = ack.getSeqno();
|
long seqno = ack.getSeqno();
|
||||||
// processes response status from datanodes.
|
// processes response status from datanodes.
|
||||||
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
|
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
|
||||||
final Status reply = ack.getReply(i);
|
final Status reply = PipelineAck.getStatusFromHeader(ack
|
||||||
|
.getReply(i));
|
||||||
// Restart will not be treated differently unless it is
|
// Restart will not be treated differently unless it is
|
||||||
// the local node or the only one in the pipeline.
|
// the local node or the only one in the pipeline.
|
||||||
if (PipelineAck.isRestartOOBStatus(reply) &&
|
if (PipelineAck.isRestartOOBStatus(reply) &&
|
||||||
|
@ -22,17 +22,21 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
import org.apache.hadoop.hdfs.util.LongBitFormat;
|
||||||
|
|
||||||
/** Pipeline Acknowledgment **/
|
/** Pipeline Acknowledgment **/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@ -46,6 +50,55 @@ public class PipelineAck {
|
|||||||
// place holder for timeout value of each OOB type
|
// place holder for timeout value of each OOB type
|
||||||
final static long[] OOB_TIMEOUT;
|
final static long[] OOB_TIMEOUT;
|
||||||
|
|
||||||
|
public enum ECN {
|
||||||
|
DISABLED(0),
|
||||||
|
SUPPORTED(1),
|
||||||
|
SUPPORTED2(2),
|
||||||
|
CONGESTED(3);
|
||||||
|
|
||||||
|
private final int value;
|
||||||
|
private static final ECN[] VALUES = values();
|
||||||
|
static ECN valueOf(int value) {
|
||||||
|
return VALUES[value];
|
||||||
|
}
|
||||||
|
|
||||||
|
ECN(int value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum StatusFormat {
|
||||||
|
STATUS(null, 4),
|
||||||
|
RESERVED(STATUS.BITS, 1),
|
||||||
|
ECN_BITS(RESERVED.BITS, 2);
|
||||||
|
|
||||||
|
private final LongBitFormat BITS;
|
||||||
|
|
||||||
|
StatusFormat(LongBitFormat prev, int bits) {
|
||||||
|
BITS = new LongBitFormat(name(), prev, bits, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Status getStatus(int header) {
|
||||||
|
return Status.valueOf((int) STATUS.BITS.retrieve(header));
|
||||||
|
}
|
||||||
|
|
||||||
|
static ECN getECN(int header) {
|
||||||
|
return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int setStatus(int old, Status status) {
|
||||||
|
return (int) STATUS.BITS.combine(status.getNumber(), old);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int setECN(int old, ECN ecn) {
|
||||||
|
return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static {
|
static {
|
||||||
OOB_TIMEOUT = new long[NUM_OOB_TYPES];
|
OOB_TIMEOUT = new long[NUM_OOB_TYPES];
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
@ -65,7 +118,7 @@ public PipelineAck() {
|
|||||||
* @param seqno sequence number
|
* @param seqno sequence number
|
||||||
* @param replies an array of replies
|
* @param replies an array of replies
|
||||||
*/
|
*/
|
||||||
public PipelineAck(long seqno, Status[] replies) {
|
public PipelineAck(long seqno, int[] replies) {
|
||||||
this(seqno, replies, 0L);
|
this(seqno, replies, 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,10 +128,15 @@ public PipelineAck(long seqno, Status[] replies) {
|
|||||||
* @param replies an array of replies
|
* @param replies an array of replies
|
||||||
* @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
|
* @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
|
||||||
*/
|
*/
|
||||||
public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) {
|
public PipelineAck(long seqno, int[] replies,
|
||||||
|
long downstreamAckTimeNanos) {
|
||||||
|
ArrayList<Integer> replyList = Lists.newArrayList();
|
||||||
|
for (int r : replies) {
|
||||||
|
replyList.add(r);
|
||||||
|
}
|
||||||
proto = PipelineAckProto.newBuilder()
|
proto = PipelineAckProto.newBuilder()
|
||||||
.setSeqno(seqno)
|
.setSeqno(seqno)
|
||||||
.addAllStatus(Arrays.asList(replies))
|
.addAllReply(replyList)
|
||||||
.setDownstreamAckTimeNanos(downstreamAckTimeNanos)
|
.setDownstreamAckTimeNanos(downstreamAckTimeNanos)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
@ -96,15 +154,15 @@ public long getSeqno() {
|
|||||||
* @return the number of replies
|
* @return the number of replies
|
||||||
*/
|
*/
|
||||||
public short getNumOfReplies() {
|
public short getNumOfReplies() {
|
||||||
return (short)proto.getStatusCount();
|
return (short)proto.getReplyCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get the ith reply
|
* get the ith reply
|
||||||
* @return the the ith reply
|
* @return the the ith reply
|
||||||
*/
|
*/
|
||||||
public Status getReply(int i) {
|
public int getReply(int i) {
|
||||||
return proto.getStatus(i);
|
return proto.getReply(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -120,8 +178,8 @@ public long getDownstreamAckTimeNanos() {
|
|||||||
* @return true if all statuses are SUCCESS
|
* @return true if all statuses are SUCCESS
|
||||||
*/
|
*/
|
||||||
public boolean isSuccess() {
|
public boolean isSuccess() {
|
||||||
for (Status reply : proto.getStatusList()) {
|
for (int reply : proto.getReplyList()) {
|
||||||
if (reply != Status.SUCCESS) {
|
if (StatusFormat.getStatus(reply) != Status.SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -138,11 +196,12 @@ public Status getOOBStatus() {
|
|||||||
if (getSeqno() != UNKOWN_SEQNO) {
|
if (getSeqno() != UNKOWN_SEQNO) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
for (Status reply : proto.getStatusList()) {
|
for (int reply : proto.getReplyList()) {
|
||||||
// The following check is valid because protobuf guarantees to
|
// The following check is valid because protobuf guarantees to
|
||||||
// preserve the ordering of enum elements.
|
// preserve the ordering of enum elements.
|
||||||
if (reply.getNumber() >= OOB_START && reply.getNumber() <= OOB_END) {
|
Status s = StatusFormat.getStatus(reply);
|
||||||
return reply;
|
if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
|
||||||
|
return s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
@ -184,4 +243,19 @@ public void write(OutputStream out) throws IOException {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
return TextFormat.shortDebugString(proto);
|
return TextFormat.shortDebugString(proto);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Status getStatusFromHeader(int header) {
|
||||||
|
return StatusFormat.getStatus(header);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int setStatusForHeader(int old, Status status) {
|
||||||
|
return StatusFormat.setStatus(old, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int combineHeader(ECN ecn, Status status) {
|
||||||
|
int header = 0;
|
||||||
|
header = StatusFormat.setStatus(header, status);
|
||||||
|
header = StatusFormat.setECN(header, ecn);
|
||||||
|
return header;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -979,8 +979,6 @@ private static enum PacketResponderType {
|
|||||||
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
|
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Processes responses from downstream datanodes in the pipeline
|
* Processes responses from downstream datanodes in the pipeline
|
||||||
* and sends back replies to the originator.
|
* and sends back replies to the originator.
|
||||||
@ -1084,7 +1082,7 @@ void sendOOBResponse(final Status ackStatus) throws IOException,
|
|||||||
LOG.info("Sending an out of band ack of type " + ackStatus);
|
LOG.info("Sending an out of band ack of type " + ackStatus);
|
||||||
try {
|
try {
|
||||||
sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
|
sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
|
||||||
ackStatus);
|
PipelineAck.combineHeader(datanode.getECN(), ackStatus));
|
||||||
} finally {
|
} finally {
|
||||||
// Let others send ack. Unless there are miltiple OOB send
|
// Let others send ack. Unless there are miltiple OOB send
|
||||||
// calls, there can be only one waiter, the responder thread.
|
// calls, there can be only one waiter, the responder thread.
|
||||||
@ -1167,7 +1165,8 @@ public void run() {
|
|||||||
if (oobStatus != null) {
|
if (oobStatus != null) {
|
||||||
LOG.info("Relaying an out of band ack of type " + oobStatus);
|
LOG.info("Relaying an out of band ack of type " + oobStatus);
|
||||||
sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
|
sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
|
||||||
Status.SUCCESS);
|
PipelineAck.combineHeader(datanode.getECN(),
|
||||||
|
Status.SUCCESS));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
seqno = ack.getSeqno();
|
seqno = ack.getSeqno();
|
||||||
@ -1241,9 +1240,10 @@ public void run() {
|
|||||||
finalizeBlock(startTime);
|
finalizeBlock(startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS;
|
||||||
sendAckUpstream(ack, expected, totalAckTimeNanos,
|
sendAckUpstream(ack, expected, totalAckTimeNanos,
|
||||||
(pkt != null ? pkt.offsetInBlock : 0),
|
(pkt != null ? pkt.offsetInBlock : 0),
|
||||||
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
|
PipelineAck.combineHeader(datanode.getECN(), myStatus));
|
||||||
if (pkt != null) {
|
if (pkt != null) {
|
||||||
// remove the packet from the ack queue
|
// remove the packet from the ack queue
|
||||||
removeAckHead();
|
removeAckHead();
|
||||||
@ -1303,11 +1303,11 @@ private void finalizeBlock(long startTime) throws IOException {
|
|||||||
* @param totalAckTimeNanos total ack time including all the downstream
|
* @param totalAckTimeNanos total ack time including all the downstream
|
||||||
* nodes
|
* nodes
|
||||||
* @param offsetInBlock offset in block for the data in packet
|
* @param offsetInBlock offset in block for the data in packet
|
||||||
* @param myStatus the local ack status
|
* @param myHeader the local ack header
|
||||||
*/
|
*/
|
||||||
private void sendAckUpstream(PipelineAck ack, long seqno,
|
private void sendAckUpstream(PipelineAck ack, long seqno,
|
||||||
long totalAckTimeNanos, long offsetInBlock,
|
long totalAckTimeNanos, long offsetInBlock,
|
||||||
Status myStatus) throws IOException {
|
int myHeader) throws IOException {
|
||||||
try {
|
try {
|
||||||
// Wait for other sender to finish. Unless there is an OOB being sent,
|
// Wait for other sender to finish. Unless there is an OOB being sent,
|
||||||
// the responder won't have to wait.
|
// the responder won't have to wait.
|
||||||
@ -1321,7 +1321,7 @@ private void sendAckUpstream(PipelineAck ack, long seqno,
|
|||||||
try {
|
try {
|
||||||
if (!running) return;
|
if (!running) return;
|
||||||
sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
|
sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
|
||||||
offsetInBlock, myStatus);
|
offsetInBlock, myHeader);
|
||||||
} finally {
|
} finally {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
sending = false;
|
sending = false;
|
||||||
@ -1341,32 +1341,34 @@ private void sendAckUpstream(PipelineAck ack, long seqno,
|
|||||||
* @param totalAckTimeNanos total ack time including all the downstream
|
* @param totalAckTimeNanos total ack time including all the downstream
|
||||||
* nodes
|
* nodes
|
||||||
* @param offsetInBlock offset in block for the data in packet
|
* @param offsetInBlock offset in block for the data in packet
|
||||||
* @param myStatus the local ack status
|
* @param myHeader the local ack header
|
||||||
*/
|
*/
|
||||||
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
|
private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
|
||||||
long totalAckTimeNanos, long offsetInBlock, Status myStatus)
|
long totalAckTimeNanos, long offsetInBlock, int myHeader)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Status[] replies = null;
|
final int[] replies;
|
||||||
if (ack == null) {
|
if (ack == null) {
|
||||||
// A new OOB response is being sent from this node. Regardless of
|
// A new OOB response is being sent from this node. Regardless of
|
||||||
// downstream nodes, reply should contain one reply.
|
// downstream nodes, reply should contain one reply.
|
||||||
replies = new Status[1];
|
replies = new int[] { myHeader };
|
||||||
replies[0] = myStatus;
|
|
||||||
} else if (mirrorError) { // ack read error
|
} else if (mirrorError) { // ack read error
|
||||||
replies = MIRROR_ERROR_STATUS;
|
int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
|
||||||
|
int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
|
||||||
|
replies = new int[] {h, h1};
|
||||||
} else {
|
} else {
|
||||||
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
|
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
|
||||||
.getNumOfReplies();
|
.getNumOfReplies();
|
||||||
replies = new Status[1 + ackLen];
|
replies = new int[ackLen + 1];
|
||||||
replies[0] = myStatus;
|
replies[0] = myHeader;
|
||||||
for (int i = 0; i < ackLen; i++) {
|
for (int i = 0; i < ackLen; ++i) {
|
||||||
replies[i + 1] = ack.getReply(i);
|
replies[i + 1] = ack.getReply(i);
|
||||||
}
|
}
|
||||||
// If the mirror has reported that it received a corrupt packet,
|
// If the mirror has reported that it received a corrupt packet,
|
||||||
// do self-destruct to mark myself bad, instead of making the
|
// do self-destruct to mark myself bad, instead of making the
|
||||||
// mirror node bad. The mirror is guaranteed to be good without
|
// mirror node bad. The mirror is guaranteed to be good without
|
||||||
// corrupt data on disk.
|
// corrupt data on disk.
|
||||||
if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
|
if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) ==
|
||||||
|
Status.ERROR_CHECKSUM) {
|
||||||
throw new IOException("Shutting down writer and responder "
|
throw new IOException("Shutting down writer and responder "
|
||||||
+ "since the down streams reported the data sent by this "
|
+ "since the down streams reported the data sent by this "
|
||||||
+ "thread is corrupt");
|
+ "thread is corrupt");
|
||||||
@ -1393,6 +1395,7 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
|
|||||||
|
|
||||||
// If a corruption was detected in the received data, terminate after
|
// If a corruption was detected in the received data, terminate after
|
||||||
// sending ERROR_CHECKSUM back.
|
// sending ERROR_CHECKSUM back.
|
||||||
|
Status myStatus = PipelineAck.getStatusFromHeader(myHeader);
|
||||||
if (myStatus == Status.ERROR_CHECKSUM) {
|
if (myStatus == Status.ERROR_CHECKSUM) {
|
||||||
throw new IOException("Shutting down writer and responder "
|
throw new IOException("Shutting down writer and responder "
|
||||||
+ "due to a checksum error in received data. The error "
|
+ "due to a checksum error in received data. The error "
|
||||||
|
@ -124,6 +124,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
|
||||||
@ -333,6 +334,7 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private final String confVersion;
|
private final String confVersion;
|
||||||
private final long maxNumberOfBlocksToLog;
|
private final long maxNumberOfBlocksToLog;
|
||||||
|
private final boolean pipelineSupportECN;
|
||||||
|
|
||||||
private final List<String> usersWithLocalPathAccess;
|
private final List<String> usersWithLocalPathAccess;
|
||||||
private final boolean connectToDnViaHostname;
|
private final boolean connectToDnViaHostname;
|
||||||
@ -366,6 +368,7 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
this.connectToDnViaHostname = false;
|
this.connectToDnViaHostname = false;
|
||||||
this.getHdfsBlockLocationsEnabled = false;
|
this.getHdfsBlockLocationsEnabled = false;
|
||||||
this.blockScanner = new BlockScanner(this, conf);
|
this.blockScanner = new BlockScanner(this, conf);
|
||||||
|
this.pipelineSupportECN = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -394,6 +397,9 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
this.isPermissionEnabled = conf.getBoolean(
|
this.isPermissionEnabled = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
|
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
|
||||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
|
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
|
||||||
|
this.pipelineSupportECN = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
|
||||||
|
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
|
||||||
|
|
||||||
confVersion = "core-" +
|
confVersion = "core-" +
|
||||||
conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
|
conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
|
||||||
@ -468,6 +474,19 @@ public Collection<String> getReconfigurableProperties() {
|
|||||||
return reconfigurable;
|
return reconfigurable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The ECN bit for the DataNode. The DataNode should return:
|
||||||
|
* <ul>
|
||||||
|
* <li>ECN.DISABLED when ECN is disabled.</li>
|
||||||
|
* <li>ECN.SUPPORTED when ECN is enabled but the DN still has capacity.</li>
|
||||||
|
* <li>ECN.CONGESTED when ECN is enabled and the DN is congested.</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
public PipelineAck.ECN getECN() {
|
||||||
|
return pipelineSupportECN ? PipelineAck.ECN.SUPPORTED : PipelineAck.ECN
|
||||||
|
.DISABLED;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contains the StorageLocations for changed data volumes.
|
* Contains the StorageLocations for changed data volumes.
|
||||||
*/
|
*/
|
||||||
|
@ -210,6 +210,7 @@ message PacketHeaderProto {
|
|||||||
optional bool syncBlock = 5 [default = false];
|
optional bool syncBlock = 5 [default = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Status is a 4-bit enum
|
||||||
enum Status {
|
enum Status {
|
||||||
SUCCESS = 0;
|
SUCCESS = 0;
|
||||||
ERROR = 1;
|
ERROR = 1;
|
||||||
@ -228,7 +229,7 @@ enum Status {
|
|||||||
|
|
||||||
message PipelineAckProto {
|
message PipelineAckProto {
|
||||||
required sint64 seqno = 1;
|
required sint64 seqno = 1;
|
||||||
repeated Status status = 2;
|
repeated uint32 reply = 2;
|
||||||
optional uint64 downstreamAckTimeNanos = 3 [default = 0];
|
optional uint64 downstreamAckTimeNanos = 3 [default = 0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +160,9 @@ private void writeZeroLengthPacket(ExtendedBlock block, String description)
|
|||||||
|
|
||||||
//ok finally write a block with 0 len
|
//ok finally write a block with 0 len
|
||||||
sendResponse(Status.SUCCESS, "", null, recvOut);
|
sendResponse(Status.SUCCESS, "", null, recvOut);
|
||||||
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
|
new PipelineAck(100, new int[] {PipelineAck.combineHeader
|
||||||
|
(PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write
|
||||||
|
(recvOut);
|
||||||
sendRecvData(description, false);
|
sendRecvData(description, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,7 +395,8 @@ public void testDataTransferProtocol() throws IOException {
|
|||||||
hdr.write(sendOut);
|
hdr.write(sendOut);
|
||||||
|
|
||||||
sendResponse(Status.SUCCESS, "", null, recvOut);
|
sendResponse(Status.SUCCESS, "", null, recvOut);
|
||||||
new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut);
|
new PipelineAck(100, new int[] {PipelineAck.combineHeader
|
||||||
|
(PipelineAck.ECN.DISABLED, Status.ERROR)}).write(recvOut);
|
||||||
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
|
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
|
||||||
true);
|
true);
|
||||||
|
|
||||||
@ -414,7 +417,8 @@ public void testDataTransferProtocol() throws IOException {
|
|||||||
sendOut.flush();
|
sendOut.flush();
|
||||||
//ok finally write a block with 0 len
|
//ok finally write a block with 0 len
|
||||||
sendResponse(Status.SUCCESS, "", null, recvOut);
|
sendResponse(Status.SUCCESS, "", null, recvOut);
|
||||||
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
|
new PipelineAck(100, new int[] {PipelineAck.combineHeader
|
||||||
|
(PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write(recvOut);
|
||||||
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
|
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
|
||||||
|
|
||||||
/* Test OP_READ_BLOCK */
|
/* Test OP_READ_BLOCK */
|
||||||
|
Loading…
Reference in New Issue
Block a user