From 2f48fae72aa52e6ec42264cad24fab36b6a426c2 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Thu, 23 Jun 2011 23:57:18 +0000 Subject: [PATCH] HDFS-2087. Declare methods in DataTransferProtocol interface, and change Sender and Receiver to implement the interface. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1139124 13f79535-47bb-0310-9956-ffa450edef68 --- hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/BlockReader.java | 7 +- .../org/apache/hadoop/hdfs/DFSClient.java | 2 +- .../apache/hadoop/hdfs/DFSOutputStream.java | 11 +- .../datatransfer/DataTransferProtocol.java | 105 ++++++++++++++- .../hdfs/protocol/datatransfer/Receiver.java | 123 +++++------------- .../hdfs/protocol/datatransfer/Sender.java | 88 +++++++------ .../hadoop/hdfs/server/balancer/Balancer.java | 4 +- .../hadoop/hdfs/server/datanode/DataNode.java | 4 +- .../hdfs/server/datanode/DataXceiver.java | 100 +++++++------- .../server/datanode/DataXceiverServer.java | 16 ++- .../datanode/DataTransferProtocolAspects.aj | 2 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 4 +- .../hadoop/hdfs/TestDataTransferProtocol.java | 58 +++++---- .../server/datanode/TestBlockReplacement.java | 4 +- .../hdfs/server/datanode/TestDiskError.java | 8 +- 16 files changed, 295 insertions(+), 244 deletions(-) diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index a0274918ef..02aade46a2 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -526,6 +526,9 @@ Trunk (unreleased changes) HDFS-2092. Remove some object references to Configuration in DFSClient. (Bharath Mundlapudi via szetszwo) + HDFS-2087. Declare methods in DataTransferProtocol interface, and change + Sender and Receiver to implement the interface. (szetszwo) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java index 67bd6f8fea..791f5cd800 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java @@ -404,10 +404,9 @@ public static BlockReader newBlockReader( Socket sock, String file, String clientName) throws IOException { // in and out will be closed when sock is closed (by the caller) - Sender.opReadBlock( - new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))), - block, startOffset, len, clientName, blockToken); + final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT))); + new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); // // Get bytes in block, set streams diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java index a29cf88b8a..6dd85d6556 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1164,7 +1164,7 @@ public static MD5MD5CRC32FileChecksum getFileChecksum(String src, + Op.BLOCK_CHECKSUM + ", block=" + block); } // get block MD5 - Sender.opBlockChecksum(out, block, lb.getBlockToken()); + new Sender(out).blockChecksum(block, lb.getBlockToken()); final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in)); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 0d120da33e..18543b8674 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -846,8 +846,8 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, DataNode.SMALL_BUFFER_SIZE)); //send the TRANSFER_BLOCK request - Sender.opTransferBlock(out, block, - dfsClient.clientName, targets, blockToken); + new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, + targets); //ack in = new DataInputStream(NetUtils.getInputStream(sock)); @@ -1019,10 +1019,9 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS, blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); // send the request - Sender.opWriteBlock(out, block, - nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS, - block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes, - accessToken); + new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, + nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, + nodes.length, block.getNumBytes(), bytesSent, newGS); checksum.writeHeader(out); out.flush(); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 3ea3016a22..4faab3e045 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -17,10 +17,16 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; /** * Transfer data to/from datanode using a streaming protocol. @@ -35,8 +41,101 @@ public interface DataTransferProtocol { * when protocol changes. It is not very obvious. */ /* - * Version 27: - * Move DataTransferProtocol and the inner classes to a package. + * Version 28: + * Declare methods in DataTransferProtocol interface. */ - public static final int DATA_TRANSFER_VERSION = 27; + public static final int DATA_TRANSFER_VERSION = 28; + + /** + * Read a block. + * + * @param blk the block being read. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param blockOffset offset of the block. + * @param length maximum number of bytes for this read. + */ + public void readBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final long blockOffset, + final long length) throws IOException; + + /** + * Write a block to a datanode pipeline. + * + * @param blk the block being written. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param targets target datanodes in the pipeline. + * @param source source datanode. + * @param stage pipeline stage. + * @param pipelineSize the size of the pipeline. + * @param minBytesRcvd minimum number of bytes received. + * @param maxBytesRcvd maximum number of bytes received. + * @param latestGenerationStamp the latest generation stamp of the block. + */ + public void writeBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets, + final DatanodeInfo source, + final BlockConstructionStage stage, + final int pipelineSize, + final long minBytesRcvd, + final long maxBytesRcvd, + final long latestGenerationStamp) throws IOException; + + /** + * Transfer a block to another datanode. + * The block stage must be + * either {@link BlockConstructionStage#TRANSFER_RBW} + * or {@link BlockConstructionStage#TRANSFER_FINALIZED}. + * + * @param blk the block being transferred. + * @param blockToken security token for accessing the block. + * @param clientName client's name. + * @param targets target datanodes. + */ + public void transferBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets) throws IOException; + + /** + * Receive a block from a source datanode + * and then notifies the namenode + * to remove the copy from the original datanode. + * Note that the source datanode and the original datanode can be different. + * It is used for balancing purpose. + * + * @param blk the block being replaced. + * @param blockToken security token for accessing the block. + * @param delHint the hint for deleting the block in the original datanode. + * @param source the source datanode for receiving the block. + */ + public void replaceBlock(final ExtendedBlock blk, + final Token blockToken, + final String delHint, + final DatanodeInfo source) throws IOException; + + /** + * Copy a block. + * It is used for balancing purpose. + * + * @param blk the block being copied. + * @param blockToken security token for accessing the block. + */ + public void copyBlock(final ExtendedBlock blk, + final Token blockToken) throws IOException; + + /** + * Get block checksum (MD5 of CRC32). + * + * @param blk a block. + * @param blockToken security token for accessing the block. + * @throws IOException + */ + public void blockChecksum(final ExtendedBlock blk, + final Token blockToken) throws IOException; } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index c17b8125ad..dff31e00c6 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -27,23 +27,26 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.security.token.Token; /** Receiver */ @InterfaceAudience.Private @InterfaceStability.Evolving -public abstract class Receiver { +public abstract class Receiver implements DataTransferProtocol { + protected final DataInputStream in; + + /** Create a receiver for DataTransferProtocol with a socket. */ + protected Receiver(final DataInputStream in) { + this.in = in; + } + /** Read an Op. It also checks protocol version. */ - protected final Op readOp(DataInputStream in) throws IOException { + protected final Op readOp() throws IOException { final short version = in.readShort(); if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) { throw new IOException( "Version Mismatch (Expected: " + @@ -54,11 +57,10 @@ protected final Op readOp(DataInputStream in) throws IOException { } /** Process op by the corresponding method. */ - protected final void processOp(Op op, DataInputStream in - ) throws IOException { + protected final void processOp(Op op) throws IOException { switch(op) { case READ_BLOCK: - opReadBlock(in); + opReadBlock(); break; case WRITE_BLOCK: opWriteBlock(in); @@ -81,121 +83,60 @@ protected final void processOp(Op op, DataInputStream in } /** Receive OP_READ_BLOCK */ - private void opReadBlock(DataInputStream in) throws IOException { + private void opReadBlock() throws IOException { OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); - - ExtendedBlock b = fromProto( - proto.getHeader().getBaseHeader().getBlock()); - Token token = fromProto( - proto.getHeader().getBaseHeader().getToken()); - - opReadBlock(in, b, proto.getOffset(), proto.getLen(), - proto.getHeader().getClientName(), token); + readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()), + fromProto(proto.getHeader().getBaseHeader().getToken()), + proto.getHeader().getClientName(), + proto.getOffset(), + proto.getLen()); } - /** - * Abstract OP_READ_BLOCK method. Read a block. - */ - protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk, - long offset, long length, String client, - Token blockToken) throws IOException; /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); - opWriteBlock(in, - fromProto(proto.getHeader().getBaseHeader().getBlock()), - proto.getPipelineSize(), - fromProto(proto.getStage()), - proto.getLatestGenerationStamp(), - proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), + writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()), + fromProto(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - fromProto(proto.getSource()), fromProtos(proto.getTargetsList()), - fromProto(proto.getHeader().getBaseHeader().getToken())); + fromProto(proto.getSource()), + fromProto(proto.getStage()), + proto.getPipelineSize(), + proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), + proto.getLatestGenerationStamp()); } - /** - * Abstract OP_WRITE_BLOCK method. - * Write a block. - */ - protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk, - int pipelineSize, BlockConstructionStage stage, long newGs, - long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src, - DatanodeInfo[] targets, Token blockToken) - throws IOException; - /** Receive {@link Op#TRANSFER_BLOCK} */ private void opTransferBlock(DataInputStream in) throws IOException { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); - - opTransferBlock(in, - fromProto(proto.getHeader().getBaseHeader().getBlock()), + transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()), + fromProto(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - fromProtos(proto.getTargetsList()), - fromProto(proto.getHeader().getBaseHeader().getToken())); + fromProtos(proto.getTargetsList())); } - /** - * Abstract {@link Op#TRANSFER_BLOCK} method. - * For {@link BlockConstructionStage#TRANSFER_RBW} - * or {@link BlockConstructionStage#TRANSFER_FINALIZED}. - */ - protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk, - String client, DatanodeInfo[] targets, - Token blockToken) - throws IOException; - /** Receive OP_REPLACE_BLOCK */ private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); - - opReplaceBlock(in, - fromProto(proto.getHeader().getBlock()), + replaceBlock(fromProto(proto.getHeader().getBlock()), + fromProto(proto.getHeader().getToken()), proto.getDelHint(), - fromProto(proto.getSource()), - fromProto(proto.getHeader().getToken())); + fromProto(proto.getSource())); } - /** - * Abstract OP_REPLACE_BLOCK method. - * It is used for balancing purpose; send to a destination - */ - protected abstract void opReplaceBlock(DataInputStream in, - ExtendedBlock blk, String delHint, DatanodeInfo src, - Token blockToken) throws IOException; - /** Receive OP_COPY_BLOCK */ private void opCopyBlock(DataInputStream in) throws IOException { OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in)); - - opCopyBlock(in, - fromProto(proto.getHeader().getBlock()), + copyBlock(fromProto(proto.getHeader().getBlock()), fromProto(proto.getHeader().getToken())); } - /** - * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to - * a proxy source. - */ - protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk, - Token blockToken) - throws IOException; - /** Receive OP_BLOCK_CHECKSUM */ private void opBlockChecksum(DataInputStream in) throws IOException { OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in)); - opBlockChecksum(in, - fromProto(proto.getHeader().getBlock()), + blockChecksum(fromProto(proto.getHeader().getBlock()), fromProto(proto.getHeader().getToken())); } - - /** - * Abstract OP_BLOCK_CHECKSUM method. - * Get the checksum of a block - */ - protected abstract void opBlockChecksum(DataInputStream in, - ExtendedBlock blk, Token blockToken) - throws IOException; } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 9502ad4d70..88288a2a6d 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -44,7 +44,14 @@ /** Sender */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class Sender { +public class Sender implements DataTransferProtocol { + private final DataOutputStream out; + + /** Create a sender for DataTransferProtocol with a output stream. */ + public Sender(final DataOutputStream out) { + this.out = out; + } + /** Initialize a operation. */ private static void op(final DataOutput out, final Op op ) throws IOException { @@ -59,79 +66,85 @@ private static void send(final DataOutputStream out, final Op opcode, out.flush(); } - /** Send OP_READ_BLOCK */ - public static void opReadBlock(DataOutputStream out, ExtendedBlock blk, - long blockOffset, long blockLen, String clientName, - Token blockToken) - throws IOException { + @Override + public void readBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final long blockOffset, + final long length) throws IOException { OpReadBlockProto proto = OpReadBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) .setOffset(blockOffset) - .setLen(blockLen) + .setLen(length) .build(); send(out, Op.READ_BLOCK, proto); } - /** Send OP_WRITE_BLOCK */ - public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk, - int pipelineSize, BlockConstructionStage stage, long newGs, - long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src, - DatanodeInfo[] targets, Token blockToken) - throws IOException { - ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client, - blockToken); + @Override + public void writeBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets, + final DatanodeInfo source, + final BlockConstructionStage stage, + final int pipelineSize, + final long minBytesRcvd, + final long maxBytesRcvd, + final long latestGenerationStamp) throws IOException { + ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( + blk, clientName, blockToken); OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) - .addAllTargets( - toProtos(targets, 1)) + .addAllTargets(toProtos(targets, 1)) .setStage(toProto(stage)) .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) .setMaxBytesRcvd(maxBytesRcvd) - .setLatestGenerationStamp(newGs); + .setLatestGenerationStamp(latestGenerationStamp); - if (src != null) { - proto.setSource(toProto(src)); + if (source != null) { + proto.setSource(toProto(source)); } send(out, Op.WRITE_BLOCK, proto.build()); } - /** Send {@link Op#TRANSFER_BLOCK} */ - public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk, - String client, DatanodeInfo[] targets, - Token blockToken) throws IOException { + @Override + public void transferBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( - blk, client, blockToken)) + blk, clientName, blockToken)) .addAllTargets(toProtos(targets, 0)) .build(); send(out, Op.TRANSFER_BLOCK, proto); } - /** Send OP_REPLACE_BLOCK */ - public static void opReplaceBlock(DataOutputStream out, - ExtendedBlock blk, String delHint, DatanodeInfo src, - Token blockToken) throws IOException { + @Override + public void replaceBlock(final ExtendedBlock blk, + final Token blockToken, + final String delHint, + final DatanodeInfo source) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) .setDelHint(delHint) - .setSource(toProto(src)) + .setSource(toProto(source)) .build(); send(out, Op.REPLACE_BLOCK, proto); } - /** Send OP_COPY_BLOCK */ - public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk, - Token blockToken) - throws IOException { + @Override + public void copyBlock(final ExtendedBlock blk, + final Token blockToken) throws IOException { OpCopyBlockProto proto = OpCopyBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) .build(); @@ -139,10 +152,9 @@ public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk, send(out, Op.COPY_BLOCK, proto); } - /** Send OP_BLOCK_CHECKSUM */ - public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk, - Token blockToken) - throws IOException { + @Override + public void blockChecksum(final ExtendedBlock blk, + final Token blockToken) throws IOException { OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) .build(); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index a5ecf85fe2..08af0fb9ff 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -348,8 +348,8 @@ private void dispatch() { private void sendRequest(DataOutputStream out) throws IOException { final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); final Token accessToken = nnc.getAccessToken(eb); - Sender.opReplaceBlock(out, eb, source.getStorageID(), - proxySource.getDatanode(), accessToken); + new Sender(out).replaceBlock(eb, accessToken, + source.getStorageID(), proxySource.getDatanode()); } /* Receive a block copy response from the input stream */ diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 816ab24a9c..d83d0fccb2 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1977,8 +1977,8 @@ public void run() { EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)); } - Sender.opWriteBlock(out, - b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken); + new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, + stage, 0, 0, 0, 0); // send data & checksum blockSender.sendBlock(out, baseStream, null); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 384ef12941..46d547f320 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -85,7 +85,10 @@ class DataXceiver extends Receiver implements Runnable, FSConstants { private long opStartTime; //the start time of receiving an Op public DataXceiver(Socket s, DataNode datanode, - DataXceiverServer dataXceiverServer) { + DataXceiverServer dataXceiverServer) throws IOException { + super(new DataInputStream(new BufferedInputStream( + NetUtils.getInputStream(s), FSConstants.SMALL_BUFFER_SIZE))); + this.s = s; this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.datanode = datanode; @@ -127,13 +130,9 @@ private void updateCurrentThreadName(String status) { public void run() { updateCurrentThreadName("Waiting for operation"); - DataInputStream in=null; int opsProcessed = 0; Op op = null; try { - in = new DataInputStream( - new BufferedInputStream(NetUtils.getInputStream(s), - SMALL_BUFFER_SIZE)); int stdTimeout = s.getSoTimeout(); // We process requests in a loop, and stay around for a short timeout. @@ -145,7 +144,7 @@ public void run() { assert socketKeepaliveTimeout > 0; s.setSoTimeout(socketKeepaliveTimeout); } - op = readOp(in); + op = readOp(); } catch (InterruptedIOException ignored) { // Time out while we wait for client rpc break; @@ -176,7 +175,7 @@ public void run() { } opStartTime = now(); - processOp(op, in); + processOp(op); ++opsProcessed; } while (!s.isClosed() && socketKeepaliveTimeout > 0); } catch (Throwable t) { @@ -196,13 +195,12 @@ public void run() { } } - /** - * Read a block from the disk. - */ @Override - protected void opReadBlock(DataInputStream in, ExtendedBlock block, - long startOffset, long length, String clientName, - Token blockToken) throws IOException { + public void readBlock(final ExtendedBlock block, + final Token blockToken, + final String clientName, + final long blockOffset, + final long length) throws IOException { OutputStream baseStream = NetUtils.getOutputStream(s, datanode.socketWriteTimeout); DataOutputStream out = new DataOutputStream( @@ -225,7 +223,7 @@ protected void opReadBlock(DataInputStream in, ExtendedBlock block, updateCurrentThreadName("Sending block " + block); try { try { - blockSender = new BlockSender(block, startOffset, length, + blockSender = new BlockSender(block, blockOffset, length, true, true, false, datanode, clientTraceFmt); } catch(IOException e) { LOG.info("opReadBlock " + block + " received exception " + e); @@ -284,16 +282,17 @@ protected void opReadBlock(DataInputStream in, ExtendedBlock block, datanode.metrics.incrReadsFromClient(isLocal); } - /** - * Write a block to disk. - */ @Override - protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, - final int pipelineSize, final BlockConstructionStage stage, - final long newGs, final long minBytesRcvd, final long maxBytesRcvd, - final String clientname, final DatanodeInfo srcDataNode, - final DatanodeInfo[] targets, final Token blockToken - ) throws IOException { + public void writeBlock(final ExtendedBlock block, + final Token blockToken, + final String clientname, + final DatanodeInfo[] targets, + final DatanodeInfo srcDataNode, + final BlockConstructionStage stage, + final int pipelineSize, + final long minBytesRcvd, + final long maxBytesRcvd, + final long latestGenerationStamp) throws IOException { updateCurrentThreadName("Receiving block " + block + " client=" + clientname); final boolean isDatanode = clientname.length() == 0; final boolean isClient = !isDatanode; @@ -308,7 +307,7 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, if (LOG.isDebugEnabled()) { LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname - + "\n block =" + block + ", newGs=" + newGs + + "\n block =" + block + ", newGs=" + latestGenerationStamp + ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]" + "\n targets=" + Arrays.asList(targets) + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode @@ -351,10 +350,10 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, blockReceiver = new BlockReceiver(block, in, s.getRemoteSocketAddress().toString(), s.getLocalSocketAddress().toString(), - stage, newGs, minBytesRcvd, maxBytesRcvd, + stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode); } else { - datanode.data.recoverClose(block, newGs, minBytesRcvd); + datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); } // @@ -380,9 +379,9 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock)); - Sender.opWriteBlock(mirrorOut, originalBlock, - pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname, - srcDataNode, targets, blockToken); + new Sender(mirrorOut).writeBlock(originalBlock, blockToken, + clientname, targets, srcDataNode, stage, pipelineSize, + minBytesRcvd, maxBytesRcvd, latestGenerationStamp); if (blockReceiver != null) { // send checksum header blockReceiver.writeChecksumHeader(mirrorOut); @@ -464,7 +463,7 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, // update its generation stamp if (isClient && stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { - block.setGenerationStamp(newGs); + block.setGenerationStamp(latestGenerationStamp); block.setNumBytes(minBytesRcvd); } @@ -499,10 +498,10 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block, } @Override - protected void opTransferBlock(final DataInputStream in, - final ExtendedBlock blk, final String client, - final DatanodeInfo[] targets, - final Token blockToken) throws IOException { + public void transferBlock(final ExtendedBlock blk, + final Token blockToken, + final String clientName, + final DatanodeInfo[] targets) throws IOException { checkAccess(null, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); @@ -511,19 +510,16 @@ protected void opTransferBlock(final DataInputStream in, final DataOutputStream out = new DataOutputStream( NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); try { - datanode.transferReplicaForPipelineRecovery(blk, targets, client); + datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); writeResponse(Status.SUCCESS, out); } finally { IOUtils.closeStream(out); } } - /** - * Get block checksum (MD5 of CRC32). - */ @Override - protected void opBlockChecksum(DataInputStream in, ExtendedBlock block, - Token blockToken) throws IOException { + public void blockChecksum(final ExtendedBlock block, + final Token blockToken) throws IOException { final DataOutputStream out = new DataOutputStream( NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); checkAccess(out, true, block, blockToken, @@ -572,12 +568,9 @@ protected void opBlockChecksum(DataInputStream in, ExtendedBlock block, datanode.metrics.addBlockChecksumOp(elapsed()); } - /** - * Read a block from the disk and then sends it to a destination. - */ @Override - protected void opCopyBlock(DataInputStream in, ExtendedBlock block, - Token blockToken) throws IOException { + public void copyBlock(final ExtendedBlock block, + final Token blockToken) throws IOException { updateCurrentThreadName("Copying block " + block); // Read in the header if (datanode.isBlockTokenEnabled) { @@ -647,15 +640,12 @@ protected void opCopyBlock(DataInputStream in, ExtendedBlock block, datanode.metrics.addCopyBlockOp(elapsed()); } - /** - * Receive a block and write it to disk, it then notifies the namenode to - * remove the copy from the source. - */ @Override - protected void opReplaceBlock(DataInputStream in, - ExtendedBlock block, String sourceID, DatanodeInfo proxySource, - Token blockToken) throws IOException { - updateCurrentThreadName("Replacing block " + block + " from " + sourceID); + public void replaceBlock(final ExtendedBlock block, + final Token blockToken, + final String delHint, + final DatanodeInfo proxySource) throws IOException { + updateCurrentThreadName("Replacing block " + block + " from " + delHint); /* read header */ block.setNumBytes(dataXceiverServer.estimateBlockSize); @@ -699,7 +689,7 @@ protected void opReplaceBlock(DataInputStream in, new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); /* send request to the proxy */ - Sender.opCopyBlock(proxyOut, block, blockToken); + new Sender(proxyOut).copyBlock(block, blockToken); // receive the response from the proxy proxyReply = new DataInputStream(new BufferedInputStream( @@ -727,7 +717,7 @@ protected void opReplaceBlock(DataInputStream in, dataXceiverServer.balanceThrottler, null); // notify name node - datanode.notifyNamenodeReceivedBlock(block, sourceID); + datanode.notifyNamenodeReceivedBlock(block, delHint); LOG.info("Moved block " + block + " from " + s.getRemoteSocketAddress()); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 841062d715..435333e07e 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -28,12 +28,13 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.balancer.Balancer; import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.hdfs.DFSConfigKeys; /** @@ -128,15 +129,20 @@ synchronized void release() { DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT)); } - /** - */ + @Override public void run() { while (datanode.shouldRun) { try { Socket s = ss.accept(); s.setTcpNoDelay(true); - new Daemon(datanode.threadGroup, - new DataXceiver(s, datanode, this)).start(); + final DataXceiver exciver; + try { + exciver = new DataXceiver(s, datanode, this); + } catch(IOException e) { + IOUtils.closeSocket(s); + throw e; + } + new Daemon(datanode.threadGroup, exciver).start(); } catch (SocketTimeoutException ignored) { // wake up to see if should continue to run } catch (IOException ie) { diff --git a/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj b/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj index 9d4f189180..2401d08d20 100644 --- a/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj +++ b/hdfs/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj @@ -46,7 +46,7 @@ public aspect DataTransferProtocolAspects { */ pointcut receiverOp(DataXceiver dataxceiver): - call(Op Receiver.readOp(DataInputStream)) && target(dataxceiver); + call(Op Receiver.readOp()) && target(dataxceiver); after(DataXceiver dataxceiver) returning(Op op): receiverOp(dataxceiver) { LOG.info("FI: receiverOp " + op + ", datanode=" diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java index a1adbb21ad..3a8d6e61db 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -683,8 +683,8 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b, final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s)); // send the request - Sender.opTransferBlock(out, b, dfsClient.clientName, - new DatanodeInfo[]{datanodes[1]}, new Token()); + new Sender(out).transferBlock(b, new Token(), + dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 9133002593..24c3bc48b1 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -72,7 +72,8 @@ public class TestDataTransferProtocol extends TestCase { DatanodeID datanode; InetSocketAddress dnAddr; ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128); - DataOutputStream sendOut = new DataOutputStream(sendBuf); + final DataOutputStream sendOut = new DataOutputStream(sendBuf); + final Sender sender = new Sender(sendOut); ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128); DataOutputStream recvOut = new DataOutputStream(recvBuf); @@ -185,9 +186,9 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n String description, Boolean eofExcepted) throws IOException { sendBuf.reset(); recvBuf.reset(); - Sender.opWriteBlock(sendOut, block, 0, - stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null, - new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN); + sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl", + new DatanodeInfo[1], null, stage, + 0, block.getNumBytes(), block.getNumBytes(), newGS); if (eofExcepted) { sendResponse(Status.ERROR, null, recvOut); sendRecvData(description, true); @@ -372,10 +373,11 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n /* Test OP_WRITE_BLOCK */ sendBuf.reset(); - Sender.opWriteBlock(sendOut, - new ExtendedBlock(poolId, newBlockId), 0, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null, - new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN); + sender.writeBlock(new ExtendedBlock(poolId, newBlockId), + BlockTokenSecretManager.DUMMY_TOKEN, "cl", + new DatanodeInfo[1], null, + BlockConstructionStage.PIPELINE_SETUP_CREATE, + 0, 0L, 0L, 0L); sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); // bad bytes per checksum @@ -386,10 +388,10 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n sendBuf.reset(); recvBuf.reset(); - Sender.opWriteBlock(sendOut, - new ExtendedBlock(poolId, ++newBlockId), 0, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null, - new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN); + sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), + BlockTokenSecretManager.DUMMY_TOKEN, "cl", + new DatanodeInfo[1], null, + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L); sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); sendOut.writeInt(512); @@ -409,10 +411,10 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n // test for writing a valid zero size block sendBuf.reset(); recvBuf.reset(); - Sender.opWriteBlock(sendOut, - new ExtendedBlock(poolId, ++newBlockId), 0, - BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null, - new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN); + sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId), + BlockTokenSecretManager.DUMMY_TOKEN, "cl", + new DatanodeInfo[1], null, + BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L); sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32); sendOut.writeInt(512); // checksum size @@ -439,22 +441,22 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n sendBuf.reset(); recvBuf.reset(); blk.setBlockId(blkid-1); - Sender.opReadBlock(sendOut, blk, 0L, fileLen, "cl", - BlockTokenSecretManager.DUMMY_TOKEN); + sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", + 0L, fileLen); sendRecvData("Wrong block ID " + newBlockId + " for read", false); // negative block start offset -1L sendBuf.reset(); blk.setBlockId(blkid); - Sender.opReadBlock(sendOut, blk, -1L, fileLen, "cl", - BlockTokenSecretManager.DUMMY_TOKEN); + sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", + -1L, fileLen); sendRecvData("Negative start-offset for read for block " + firstBlock.getBlockId(), false); // bad block start offset sendBuf.reset(); - Sender.opReadBlock(sendOut, blk, fileLen, fileLen, "cl", - BlockTokenSecretManager.DUMMY_TOKEN); + sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", + fileLen, fileLen); sendRecvData("Wrong start-offset for reading block " + firstBlock.getBlockId(), false); @@ -462,8 +464,8 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n recvBuf.reset(); sendResponse(Status.SUCCESS, null, recvOut); sendBuf.reset(); - Sender.opReadBlock(sendOut, blk, 0L, - -1 - random.nextInt(oneMil), "cl", BlockTokenSecretManager.DUMMY_TOKEN); + sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", + 0L, -1L-random.nextInt(oneMil)); sendRecvData("Negative length for reading block " + firstBlock.getBlockId(), false); @@ -471,15 +473,15 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n recvBuf.reset(); sendResponse(Status.ERROR, null, recvOut); sendBuf.reset(); - Sender.opReadBlock(sendOut, blk, 0L, - fileLen + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN); + sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", + 0L, fileLen+1); sendRecvData("Wrong length for reading block " + firstBlock.getBlockId(), false); //At the end of all this, read the file to make sure that succeeds finally. sendBuf.reset(); - Sender.opReadBlock(sendOut, blk, 0L, - fileLen, "cl", BlockTokenSecretManager.DUMMY_TOKEN); + sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl", + 0L, fileLen); readFile(fileSys, file, fileLen); } finally { cluster.shutdown(); diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index e6349cec64..93204eaccb 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -258,8 +258,8 @@ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, sock.setKeepAlive(true); // sendRequest DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - Sender.opReplaceBlock(out, block, source - .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN); + new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, + source.getStorageID(), sourceProxy); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index ed862b8a31..067009a6c2 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -140,10 +140,10 @@ public void testReplicationError() throws Exception { // write the header. DataOutputStream out = new DataOutputStream(s.getOutputStream()); - Sender.opWriteBlock(out, block.getBlock(), 1, - BlockConstructionStage.PIPELINE_SETUP_CREATE, - 0L, 0L, 0L, "", null, new DatanodeInfo[0], - BlockTokenSecretManager.DUMMY_TOKEN); + new Sender(out).writeBlock(block.getBlock(), + BlockTokenSecretManager.DUMMY_TOKEN, "", + new DatanodeInfo[0], null, + BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L); // write check header out.writeByte( 1 );