HDFS-7439. Add BlockOpResponseProto's message to the exception messages. Contributed by Takanobu Asanuma

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-03-02 15:03:58 +08:00
parent dd9cd0797c
commit 67ed59348d
7 changed files with 55 additions and 62 deletions

View File

@ -694,6 +694,9 @@ Release 2.7.0 - UNRELEASED
HDFS-5853. Add "hadoop.user.group.metrics.percentiles.intervals" to
hdfs-default.xml. (aajisaka)
HDFS-7439. Add BlockOpResponseProto's message to the exception messages.
(Takanobu Asanuma via szetszwo)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -174,6 +174,7 @@
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
@ -2260,15 +2261,9 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (reply.getStatus() != Status.SUCCESS) {
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException();
} else {
throw new IOException("Bad response " + reply + " for block "
+ block + " from datanode " + datanodes[j]);
}
}
String logInfo = "for block " + block + " from datanode " + datanodes[j];
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
OpBlockChecksumResponseProto checksumData =
reply.getChecksumResponse();
@ -2425,16 +2420,9 @@ private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
0, 1, true, CachingStrategy.newDefaultStrategy());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (reply.getStatus() != Status.SUCCESS) {
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException();
} else {
throw new IOException("Bad response " + reply + " trying to read "
+ lb.getBlock() + " from datanode " + dn);
}
}
String logInfo = "trying to read " + lb.getBlock() + " from datanode " + dn;
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
} finally {
IOUtils.cleanup(null, pair.in, pair.out);

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@ -1469,16 +1470,10 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
checkRestart = true;
throw new IOException("A datanode is restarting.");
}
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
} else {
throw new IOException("Bad connect ack with firstBadLink as "
+ firstBadLink);
}
}
String logInfo = "ack with firstBadLink as " + firstBadLink;
DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.net.NetUtils;
@ -448,22 +447,13 @@ static void checkSuccess(
BlockOpResponseProto status, Peer peer,
ExtendedBlock block, String file)
throws IOException {
if (status.getStatus() != Status.SUCCESS) {
if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self="
+ peer.getLocalAddressString() + ", remote="
+ peer.getRemoteAddressString() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
} else {
throw new IOException("Got error for OP_READ_BLOCK, self="
+ peer.getLocalAddressString() + ", remote="
+ peer.getRemoteAddressString() + ", for file " + file
+ ", for pool " + block.getBlockPoolId() + " block "
+ block.getBlockId() + "_" + block.getGenerationStamp());
}
}
String logInfo = "for OP_READ_BLOCK"
+ ", self=" + peer.getLocalAddressString()
+ ", remote=" + peer.getRemoteAddressString()
+ ", for file " + file
+ ", for pool " + block.getBlockPoolId()
+ " block " + block.getBlockId() + "_" + block.getGenerationStamp();
DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
}
@Override

View File

@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
@ -29,6 +34,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.Span;
@ -119,4 +125,24 @@ public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
}
return scope;
}
public static void checkBlockOpStatus(
BlockOpResponseProto response,
String logInfo) throws IOException {
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error"
+ ", status message " + response.getMessage()
+ ", " + logInfo
);
} else {
throw new IOException(
"Got error"
+ ", status message " + response.getMessage()
+ ", " + logInfo
);
}
}
}
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@ -357,12 +358,8 @@ private void receiveResponse(DataInputStream in) throws IOException {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
}
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new IOException("block move failed due to access token error");
}
throw new IOException("block move is failed: " + response.getMessage());
}
String logInfo = "block move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
}
/** reset the object */

View File

@ -1116,16 +1116,10 @@ public void replaceBlock(final ExtendedBlock block,
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(proxyReply));
if (copyResponse.getStatus() != SUCCESS) {
if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress()
+ " failed due to access token error");
}
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress() + " failed");
}
String logInfo = "copy block " + block + " from "
+ proxySock.getRemoteSocketAddress();
DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);
// get checksum info about the block we're copying
ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(