HDFS-2512. Add textual error message to data transfer protocol responses. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195693 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-10-31 21:53:58 +00:00
parent 32cad9affe
commit c46876982e
6 changed files with 179 additions and 56 deletions

View File

@ -836,6 +836,9 @@ Release 0.23.0 - Unreleased
HDFS-2436. Change FSNamesystem.setTimes(..) for allowing setting times on
directories. (Uma Maheswara Rao G via szetszwo)
HDFS-2512. Add textual error message to data transfer protocol responses
(todd)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: datatransfer.proto
@ -6936,6 +6935,10 @@ public final class DataTransferProtos {
boolean hasChecksumResponse();
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto getChecksumResponse();
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProtoOrBuilder getChecksumResponseOrBuilder();
// optional string message = 4;
boolean hasMessage();
String getMessage();
}
public static final class BlockOpResponseProto extends
com.google.protobuf.GeneratedMessage
@ -7021,10 +7024,43 @@ public final class DataTransferProtos {
return checksumResponse_;
}
// optional string message = 4;
public static final int MESSAGE_FIELD_NUMBER = 4;
private java.lang.Object message_;
public boolean hasMessage() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public String getMessage() {
java.lang.Object ref = message_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (com.google.protobuf.Internal.isValidUtf8(bs)) {
message_ = s;
}
return s;
}
}
private com.google.protobuf.ByteString getMessageBytes() {
java.lang.Object ref = message_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
message_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private void initFields() {
status_ = org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
firstBadLink_ = "";
checksumResponse_ = org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto.getDefaultInstance();
message_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -7057,6 +7093,9 @@ public final class DataTransferProtos {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeMessage(3, checksumResponse_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(4, getMessageBytes());
}
getUnknownFields().writeTo(output);
}
@ -7078,6 +7117,10 @@ public final class DataTransferProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(3, checksumResponse_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(4, getMessageBytes());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -7116,6 +7159,11 @@ public final class DataTransferProtos {
result = result && getChecksumResponse()
.equals(other.getChecksumResponse());
}
result = result && (hasMessage() == other.hasMessage());
if (hasMessage()) {
result = result && getMessage()
.equals(other.getMessage());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -7137,6 +7185,10 @@ public final class DataTransferProtos {
hash = (37 * hash) + CHECKSUMRESPONSE_FIELD_NUMBER;
hash = (53 * hash) + getChecksumResponse().hashCode();
}
if (hasMessage()) {
hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
hash = (53 * hash) + getMessage().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@ -7264,6 +7316,8 @@ public final class DataTransferProtos {
checksumResponseBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000004);
message_ = "";
bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@ -7318,6 +7372,10 @@ public final class DataTransferProtos {
} else {
result.checksumResponse_ = checksumResponseBuilder_.build();
}
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
result.message_ = message_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -7343,6 +7401,9 @@ public final class DataTransferProtos {
if (other.hasChecksumResponse()) {
mergeChecksumResponse(other.getChecksumResponse());
}
if (other.hasMessage()) {
setMessage(other.getMessage());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -7409,6 +7470,11 @@ public final class DataTransferProtos {
setChecksumResponse(subBuilder.buildPartial());
break;
}
case 34: {
bitField0_ |= 0x00000008;
message_ = input.readBytes();
break;
}
}
}
}
@ -7565,6 +7631,42 @@ public final class DataTransferProtos {
return checksumResponseBuilder_;
}
// optional string message = 4;
private java.lang.Object message_ = "";
public boolean hasMessage() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public String getMessage() {
java.lang.Object ref = message_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
message_ = s;
return s;
} else {
return (String) ref;
}
}
public Builder setMessage(String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000008;
message_ = value;
onChanged();
return this;
}
public Builder clearMessage() {
bitField0_ = (bitField0_ & ~0x00000008);
message_ = getDefaultInstance().getMessage();
onChanged();
return this;
}
void setMessage(com.google.protobuf.ByteString value) {
bitField0_ |= 0x00000008;
message_ = value;
onChanged();
}
// @@protoc_insertion_point(builder_scope:BlockOpResponseProto)
}
@ -8995,19 +9097,20 @@ public final class DataTransferProtos {
"\030\001 \002(\020\022\r\n\005seqno\030\002 \002(\020\022\031\n\021lastPacketInBlo" +
"ck\030\003 \002(\010\022\017\n\007dataLen\030\004 \002(\017\":\n\020PipelineAck" +
"Proto\022\r\n\005seqno\030\001 \002(\022\022\027\n\006status\030\002 \003(\0162\007.S" +
"tatus\"~\n\024BlockOpResponseProto\022\027\n\006status\030" +
"\001 \002(\0162\007.Status\022\024\n\014firstBadLink\030\002 \001(\t\0227\n\020" +
"checksumResponse\030\003 \001(\0132\035.OpBlockChecksum" +
"ResponseProto\"0\n\025ClientReadStatusProto\022\027" +
"\n\006status\030\001 \002(\0162\007.Status\"-\n\022DNTransferAck" +
"Proto\022\027\n\006status\030\001 \002(\0162\007.Status\"U\n\034OpBloc",
"kChecksumResponseProto\022\023\n\013bytesPerCrc\030\001 " +
"\002(\r\022\023\n\013crcPerBlock\030\002 \002(\004\022\013\n\003md5\030\003 \002(\014*\202\001" +
"\n\006Status\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\022\n\016ERRO" +
"R_CHECKSUM\020\002\022\021\n\rERROR_INVALID\020\003\022\020\n\014ERROR" +
"_EXISTS\020\004\022\026\n\022ERROR_ACCESS_TOKEN\020\005\022\017\n\013CHE" +
"CKSUM_OK\020\006B>\n%org.apache.hadoop.hdfs.pro" +
"tocol.protoB\022DataTransferProtos\240\001\001"
"tatus\"\217\001\n\024BlockOpResponseProto\022\027\n\006status" +
"\030\001 \002(\0162\007.Status\022\024\n\014firstBadLink\030\002 \001(\t\0227\n" +
"\020checksumResponse\030\003 \001(\0132\035.OpBlockChecksu" +
"mResponseProto\022\017\n\007message\030\004 \001(\t\"0\n\025Clien" +
"tReadStatusProto\022\027\n\006status\030\001 \002(\0162\007.Statu" +
"s\"-\n\022DNTransferAckProto\022\027\n\006status\030\001 \002(\0162",
"\007.Status\"U\n\034OpBlockChecksumResponseProto" +
"\022\023\n\013bytesPerCrc\030\001 \002(\r\022\023\n\013crcPerBlock\030\002 \002" +
"(\004\022\013\n\003md5\030\003 \002(\014*\202\001\n\006Status\022\013\n\007SUCCESS\020\000\022" +
"\t\n\005ERROR\020\001\022\022\n\016ERROR_CHECKSUM\020\002\022\021\n\rERROR_" +
"INVALID\020\003\022\020\n\014ERROR_EXISTS\020\004\022\026\n\022ERROR_ACC" +
"ESS_TOKEN\020\005\022\017\n\013CHECKSUM_OK\020\006B>\n%org.apac" +
"he.hadoop.hdfs.protocol.protoB\022DataTrans" +
"ferProtos\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -9099,7 +9202,7 @@ public final class DataTransferProtos {
internal_static_BlockOpResponseProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_BlockOpResponseProto_descriptor,
new java.lang.String[] { "Status", "FirstBadLink", "ChecksumResponse", },
new java.lang.String[] { "Status", "FirstBadLink", "ChecksumResponse", "Message", },
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.class,
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder.class);
internal_static_ClientReadStatusProto_descriptor =

View File

@ -358,7 +358,8 @@ public class Balancer {
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
throw new IOException("block move failed due to access token error");
throw new IOException("block move is failed");
throw new IOException("block move is failed: " +
response.getMessage());
}
}

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@ -225,13 +227,14 @@ class DataXceiver extends Receiver implements Runnable {
blockSender = new BlockSender(block, blockOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
LOG.info("opReadBlock " + block + " received exception " + e);
sendResponse(s, ERROR, datanode.socketWriteTimeout);
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
throw e;
}
// send op status
sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout);
long read = blockSender.sendBlock(out, baseStream, null); // send data
@ -452,7 +455,7 @@ class DataXceiver extends Receiver implements Runnable {
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
writeResponse(SUCCESS, replyOut);
writeResponse(SUCCESS, null, replyOut);
}
}
@ -507,7 +510,7 @@ class DataXceiver extends Receiver implements Runnable {
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
writeResponse(Status.SUCCESS, out);
writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
}
@ -577,16 +580,17 @@ class DataXceiver extends Receiver implements Runnable {
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_COPY_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout);
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
LOG.info("Not able to copy block " + block.getBlockId() + " to "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
sendResponse(s, ERROR, datanode.socketWriteTimeout);
String msg = "Not able to copy block " + block.getBlockId() + " to "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.info(msg);
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
return;
}
@ -606,7 +610,7 @@ class DataXceiver extends Receiver implements Runnable {
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
// send status first
writeResponse(SUCCESS, reply);
writeResponse(SUCCESS, null, reply);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
@ -653,21 +657,24 @@ class DataXceiver extends Receiver implements Runnable {
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_REPLACE_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
datanode.socketWriteTimeout);
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
LOG.warn("Not able to receive block " + block.getBlockId() + " from "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
sendResponse(s, ERROR, datanode.socketWriteTimeout);
String msg = "Not able to receive block " + block.getBlockId() + " from "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.warn(msg);
sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
return;
}
Socket proxySock = null;
DataOutputStream proxyOut = null;
Status opStatus = SUCCESS;
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
@ -720,7 +727,8 @@ class DataXceiver extends Receiver implements Runnable {
} catch (IOException ioe) {
opStatus = ERROR;
LOG.info("opReplaceBlock " + block + " received exception " + ioe);
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
LOG.info(errMsg);
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread resource
@ -736,7 +744,7 @@ class DataXceiver extends Receiver implements Runnable {
// send response back
try {
sendResponse(s, opStatus, datanode.socketWriteTimeout);
sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
@ -759,21 +767,22 @@ class DataXceiver extends Receiver implements Runnable {
* @param opStatus status message to write
* @param timeout send timeout
**/
private void sendResponse(Socket s, Status status,
private void sendResponse(Socket s, Status status, String message,
long timeout) throws IOException {
DataOutputStream reply =
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
writeResponse(status, reply);
writeResponse(status, message, reply);
}
private void writeResponse(Status status, OutputStream out)
private void writeResponse(Status status, String message, OutputStream out)
throws IOException {
BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
.setStatus(status)
.build();
response.writeDelimitedTo(out);
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
.setStatus(status);
if (message != null) {
response.setMessage(message);
}
response.build().writeDelimitedTo(out);
out.flush();
}

View File

@ -119,6 +119,9 @@ message BlockOpResponseProto {
optional string firstBadLink = 2;
optional OpBlockChecksumResponseProto checksumResponse = 3;
/** explanatory text which may be useful to log on the client side */
optional string message = 4;
}
/**

View File

@ -117,10 +117,8 @@ public class TestDataTransferProtocol extends TestCase {
throw eof;
}
LOG.info("Received: " +
StringUtils.byteToHexString(retBuf));
LOG.info("Expected: " +
StringUtils.byteToHexString(recvBuf.toByteArray()));
LOG.info("Received: " +new String(retBuf));
LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray()));
if (eofExpected) {
throw new IOException("Did not recieve IOException when an exception " +
@ -129,10 +127,8 @@ public class TestDataTransferProtocol extends TestCase {
}
byte[] needed = recvBuf.toByteArray();
for (int i=0; i<retBuf.length; i++) {
System.out.print(retBuf[i]);
assertEquals("checking byte[" + i + "]", needed[i], retBuf[i]);
}
assertEquals(StringUtils.byteToHexString(needed),
StringUtils.byteToHexString(retBuf));
} finally {
IOUtils.closeSocket(sock);
}
@ -166,18 +162,22 @@ public class TestDataTransferProtocol extends TestCase {
sendOut.writeInt(0); // zero checksum
//ok finally write a block with 0 len
sendResponse(Status.SUCCESS, "", recvOut);
sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
sendRecvData(description, false);
}
private void sendResponse(Status status, String firstBadLink,
String message,
DataOutputStream out)
throws IOException {
Builder builder = BlockOpResponseProto.newBuilder().setStatus(status);
if (firstBadLink != null) {
builder.setFirstBadLink(firstBadLink);
}
if (message != null) {
builder.setMessage(message);
}
builder.build()
.writeDelimitedTo(out);
}
@ -190,11 +190,11 @@ public class TestDataTransferProtocol extends TestCase {
new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS);
if (eofExcepted) {
sendResponse(Status.ERROR, null, recvOut);
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true);
} else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
//ok finally write a block with 0 len
sendResponse(Status.SUCCESS, "", recvOut);
sendResponse(Status.SUCCESS, "", null, recvOut);
sendRecvData(description, false);
} else {
writeZeroLengthPacket(block, description);
@ -383,7 +383,7 @@ public class TestDataTransferProtocol extends TestCase {
// bad bytes per checksum
sendOut.writeInt(-1-random.nextInt(oneMil));
recvBuf.reset();
sendResponse(Status.ERROR, null, recvOut);
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true);
sendBuf.reset();
@ -403,7 +403,7 @@ public class TestDataTransferProtocol extends TestCase {
-1 - random.nextInt(oneMil)); // bad datalen
hdr.write(sendOut);
sendResponse(Status.SUCCESS, "", recvOut);
sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut);
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
true);
@ -428,7 +428,7 @@ public class TestDataTransferProtocol extends TestCase {
sendOut.writeInt(0); // zero checksum
sendOut.flush();
//ok finally write a block with 0 len
sendResponse(Status.SUCCESS, "", recvOut);
sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
@ -462,7 +462,7 @@ public class TestDataTransferProtocol extends TestCase {
// negative length is ok. Datanode assumes we want to read the whole block.
recvBuf.reset();
sendResponse(Status.SUCCESS, null, recvOut);
sendResponse(Status.SUCCESS, null, null, recvOut);
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil));
@ -471,7 +471,11 @@ public class TestDataTransferProtocol extends TestCase {
// length is more than size of block.
recvBuf.reset();
sendResponse(Status.ERROR, null, recvOut);
sendResponse(Status.ERROR, null,
"opReadBlock " + firstBlock +
" received exception java.io.IOException: " +
"Offset 0 and length 4097 don't match block " + firstBlock + " ( blockLen 4096 )",
recvOut);
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen+1);