From c46876982ed90d0819a94b518f6135b82334d10d Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Mon, 31 Oct 2011 21:53:58 +0000 Subject: [PATCH] HDFS-2512. Add textual error message to data transfer protocol responses. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195693 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../protocol/proto/DataTransferProtos.java | 133 ++++++++++++++++-- .../hadoop/hdfs/server/balancer/Balancer.java | 3 +- .../hdfs/server/datanode/DataXceiver.java | 57 ++++---- .../hadoop-hdfs/src/proto/datatransfer.proto | 3 + .../hadoop/hdfs/TestDataTransferProtocol.java | 36 ++--- 6 files changed, 179 insertions(+), 56 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 66fe7f171d..aa5cd5cd58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -836,6 +836,9 @@ Release 0.23.0 - Unreleased HDFS-2436. Change FSNamesystem.setTimes(..) for allowing setting times on directories. (Uma Maheswara Rao G via szetszwo) + HDFS-2512. Add textual error message to data transfer protocol responses + (todd) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java index c1e9f01526..08e15ef88e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - // Generated by the protocol buffer compiler. DO NOT EDIT! // source: datatransfer.proto @@ -6936,6 +6935,10 @@ public interface BlockOpResponseProtoOrBuilder boolean hasChecksumResponse(); org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto getChecksumResponse(); org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProtoOrBuilder getChecksumResponseOrBuilder(); + + // optional string message = 4; + boolean hasMessage(); + String getMessage(); } public static final class BlockOpResponseProto extends com.google.protobuf.GeneratedMessage @@ -7021,10 +7024,43 @@ public org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumR return checksumResponse_; } + // optional string message = 4; + public static final int MESSAGE_FIELD_NUMBER = 4; + private java.lang.Object message_; + public boolean hasMessage() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public String getMessage() { + java.lang.Object ref = message_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + message_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getMessageBytes() { + java.lang.Object ref = message_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + message_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { status_ = org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; firstBadLink_ = ""; checksumResponse_ = org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto.getDefaultInstance(); + message_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7057,6 +7093,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeMessage(3, checksumResponse_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(4, getMessageBytes()); + } getUnknownFields().writeTo(output); } @@ -7078,6 +7117,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeMessageSize(3, checksumResponse_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(4, getMessageBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -7116,6 +7159,11 @@ public boolean equals(final java.lang.Object obj) { result = result && getChecksumResponse() .equals(other.getChecksumResponse()); } + result = result && (hasMessage() == other.hasMessage()); + if (hasMessage()) { + result = result && getMessage() + .equals(other.getMessage()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -7137,6 +7185,10 @@ public int hashCode() { hash = (37 * hash) + CHECKSUMRESPONSE_FIELD_NUMBER; hash = (53 * hash) + getChecksumResponse().hashCode(); } + if (hasMessage()) { + hash = (37 * hash) + MESSAGE_FIELD_NUMBER; + hash = (53 * hash) + getMessage().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -7264,6 +7316,8 @@ public Builder clear() { checksumResponseBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000004); + message_ = ""; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -7318,6 +7372,10 @@ public org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP } else { result.checksumResponse_ = checksumResponseBuilder_.build(); } + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.message_ = message_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -7343,6 +7401,9 @@ public Builder mergeFrom(org.apache.hadoop.hdfs.protocol.proto.DataTransferProto if (other.hasChecksumResponse()) { mergeChecksumResponse(other.getChecksumResponse()); } + if (other.hasMessage()) { + setMessage(other.getMessage()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -7409,6 +7470,11 @@ public Builder mergeFrom( setChecksumResponse(subBuilder.buildPartial()); break; } + case 34: { + bitField0_ |= 0x00000008; + message_ = input.readBytes(); + break; + } } } } @@ -7565,6 +7631,42 @@ public org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumR return checksumResponseBuilder_; } + // optional string message = 4; + private java.lang.Object message_ = ""; + public boolean hasMessage() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public String getMessage() { + java.lang.Object ref = message_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + message_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setMessage(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + message_ = value; + onChanged(); + return this; + } + public Builder clearMessage() { + bitField0_ = (bitField0_ & ~0x00000008); + message_ = getDefaultInstance().getMessage(); + onChanged(); + return this; + } + void setMessage(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000008; + message_ = value; + onChanged(); + } + // @@protoc_insertion_point(builder_scope:BlockOpResponseProto) } @@ -8995,19 +9097,20 @@ public Builder clearMd5() { "\030\001 \002(\020\022\r\n\005seqno\030\002 \002(\020\022\031\n\021lastPacketInBlo" + "ck\030\003 \002(\010\022\017\n\007dataLen\030\004 \002(\017\":\n\020PipelineAck" + "Proto\022\r\n\005seqno\030\001 \002(\022\022\027\n\006status\030\002 \003(\0162\007.S" + - "tatus\"~\n\024BlockOpResponseProto\022\027\n\006status\030" + - "\001 \002(\0162\007.Status\022\024\n\014firstBadLink\030\002 \001(\t\0227\n\020" + - "checksumResponse\030\003 \001(\0132\035.OpBlockChecksum" + - "ResponseProto\"0\n\025ClientReadStatusProto\022\027" + - "\n\006status\030\001 \002(\0162\007.Status\"-\n\022DNTransferAck" + - "Proto\022\027\n\006status\030\001 \002(\0162\007.Status\"U\n\034OpBloc", - "kChecksumResponseProto\022\023\n\013bytesPerCrc\030\001 " + - "\002(\r\022\023\n\013crcPerBlock\030\002 \002(\004\022\013\n\003md5\030\003 \002(\014*\202\001" + - "\n\006Status\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\022\n\016ERRO" + - "R_CHECKSUM\020\002\022\021\n\rERROR_INVALID\020\003\022\020\n\014ERROR" + - "_EXISTS\020\004\022\026\n\022ERROR_ACCESS_TOKEN\020\005\022\017\n\013CHE" + - "CKSUM_OK\020\006B>\n%org.apache.hadoop.hdfs.pro" + - "tocol.protoB\022DataTransferProtos\240\001\001" + "tatus\"\217\001\n\024BlockOpResponseProto\022\027\n\006status" + + "\030\001 \002(\0162\007.Status\022\024\n\014firstBadLink\030\002 \001(\t\0227\n" + + "\020checksumResponse\030\003 \001(\0132\035.OpBlockChecksu" + + "mResponseProto\022\017\n\007message\030\004 \001(\t\"0\n\025Clien" + + "tReadStatusProto\022\027\n\006status\030\001 \002(\0162\007.Statu" + + "s\"-\n\022DNTransferAckProto\022\027\n\006status\030\001 \002(\0162", + "\007.Status\"U\n\034OpBlockChecksumResponseProto" + + "\022\023\n\013bytesPerCrc\030\001 \002(\r\022\023\n\013crcPerBlock\030\002 \002" + + "(\004\022\013\n\003md5\030\003 \002(\014*\202\001\n\006Status\022\013\n\007SUCCESS\020\000\022" + + "\t\n\005ERROR\020\001\022\022\n\016ERROR_CHECKSUM\020\002\022\021\n\rERROR_" + + "INVALID\020\003\022\020\n\014ERROR_EXISTS\020\004\022\026\n\022ERROR_ACC" + + "ESS_TOKEN\020\005\022\017\n\013CHECKSUM_OK\020\006B>\n%org.apac" + + "he.hadoop.hdfs.protocol.protoB\022DataTrans" + + "ferProtos\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -9099,7 +9202,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_BlockOpResponseProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_BlockOpResponseProto_descriptor, - new java.lang.String[] { "Status", "FirstBadLink", "ChecksumResponse", }, + new java.lang.String[] { "Status", "FirstBadLink", "ChecksumResponse", "Message", }, org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.class, org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder.class); internal_static_ClientReadStatusProto_descriptor = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index b8b6b38509..d29450d6c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -358,7 +358,8 @@ private void receiveResponse(DataInputStream in) throws IOException { if (response.getStatus() != Status.SUCCESS) { if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) throw new IOException("block move failed due to access token error"); - throw new IOException("block move is failed"); + throw new IOException("block move is failed: " + + response.getMessage()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 2d3c49a3c8..efad2f80e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -48,6 +48,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; @@ -225,13 +227,14 @@ public void readBlock(final ExtendedBlock block, blockSender = new BlockSender(block, blockOffset, length, true, true, false, datanode, clientTraceFmt); } catch(IOException e) { - LOG.info("opReadBlock " + block + " received exception " + e); - sendResponse(s, ERROR, datanode.socketWriteTimeout); + String msg = "opReadBlock " + block + " received exception " + e; + LOG.info(msg); + sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); throw e; } // send op status - sendResponse(s, SUCCESS, datanode.socketWriteTimeout); + sendResponse(s, SUCCESS, null, datanode.socketWriteTimeout); long read = blockSender.sendBlock(out, baseStream, null); // send data @@ -452,7 +455,7 @@ public void writeBlock(final ExtendedBlock block, if (LOG.isTraceEnabled()) { LOG.trace("TRANSFER: send close-ack"); } - writeResponse(SUCCESS, replyOut); + writeResponse(SUCCESS, null, replyOut); } } @@ -507,7 +510,7 @@ public void transferBlock(final ExtendedBlock blk, NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); try { datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); - writeResponse(Status.SUCCESS, out); + writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); } @@ -577,16 +580,17 @@ public void copyBlock(final ExtendedBlock block, LOG.warn("Invalid access token in request from " + remoteAddress + " for OP_COPY_BLOCK for block " + block + " : " + e.getLocalizedMessage()); - sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout); + sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout); return; } } if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start - LOG.info("Not able to copy block " + block.getBlockId() + " to " - + s.getRemoteSocketAddress() + " because threads quota is exceeded."); - sendResponse(s, ERROR, datanode.socketWriteTimeout); + String msg = "Not able to copy block " + block.getBlockId() + " to " + + s.getRemoteSocketAddress() + " because threads quota is exceeded."; + LOG.info(msg); + sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); return; } @@ -606,7 +610,7 @@ public void copyBlock(final ExtendedBlock block, baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); // send status first - writeResponse(SUCCESS, reply); + writeResponse(SUCCESS, null, reply); // send block content to the target long read = blockSender.sendBlock(reply, baseStream, dataXceiverServer.balanceThrottler); @@ -653,21 +657,24 @@ public void replaceBlock(final ExtendedBlock block, LOG.warn("Invalid access token in request from " + remoteAddress + " for OP_REPLACE_BLOCK for block " + block + " : " + e.getLocalizedMessage()); - sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout); + sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", + datanode.socketWriteTimeout); return; } } if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start - LOG.warn("Not able to receive block " + block.getBlockId() + " from " - + s.getRemoteSocketAddress() + " because threads quota is exceeded."); - sendResponse(s, ERROR, datanode.socketWriteTimeout); + String msg = "Not able to receive block " + block.getBlockId() + " from " + + s.getRemoteSocketAddress() + " because threads quota is exceeded."; + LOG.warn(msg); + sendResponse(s, ERROR, msg, datanode.socketWriteTimeout); return; } Socket proxySock = null; DataOutputStream proxyOut = null; Status opStatus = SUCCESS; + String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; @@ -720,7 +727,8 @@ public void replaceBlock(final ExtendedBlock block, } catch (IOException ioe) { opStatus = ERROR; - LOG.info("opReplaceBlock " + block + " received exception " + ioe); + errMsg = "opReplaceBlock " + block + " received exception " + ioe; + LOG.info(errMsg); throw ioe; } finally { // receive the last byte that indicates the proxy released its thread resource @@ -736,7 +744,7 @@ public void replaceBlock(final ExtendedBlock block, // send response back try { - sendResponse(s, opStatus, datanode.socketWriteTimeout); + sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout); } catch (IOException ioe) { LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); } @@ -759,21 +767,22 @@ private long elapsed() { * @param opStatus status message to write * @param timeout send timeout **/ - private void sendResponse(Socket s, Status status, + private void sendResponse(Socket s, Status status, String message, long timeout) throws IOException { DataOutputStream reply = new DataOutputStream(NetUtils.getOutputStream(s, timeout)); - writeResponse(status, reply); + writeResponse(status, message, reply); } - private void writeResponse(Status status, OutputStream out) + private void writeResponse(Status status, String message, OutputStream out) throws IOException { - BlockOpResponseProto response = BlockOpResponseProto.newBuilder() - .setStatus(status) - .build(); - - response.writeDelimitedTo(out); + BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() + .setStatus(status); + if (message != null) { + response.setMessage(message); + } + response.build().writeDelimitedTo(out); out.flush(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto index 7c5f859b22..1319ed6e74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/proto/datatransfer.proto @@ -119,6 +119,9 @@ message BlockOpResponseProto { optional string firstBadLink = 2; optional OpBlockChecksumResponseProto checksumResponse = 3; + + /** explanatory text which may be useful to log on the client side */ + optional string message = 4; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 72faa319b7..9ebecd48bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -117,10 +117,8 @@ private void sendRecvData(String testDescription, throw eof; } - LOG.info("Received: " + - StringUtils.byteToHexString(retBuf)); - LOG.info("Expected: " + - StringUtils.byteToHexString(recvBuf.toByteArray())); + LOG.info("Received: " +new String(retBuf)); + LOG.info("Expected: " + StringUtils.byteToHexString(recvBuf.toByteArray())); if (eofExpected) { throw new IOException("Did not recieve IOException when an exception " + @@ -129,10 +127,8 @@ private void sendRecvData(String testDescription, } byte[] needed = recvBuf.toByteArray(); - for (int i=0; i