HDFS-2087. Declare methods in DataTransferProtocol interface, and change Sender and Receiver to implement the interface.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1139124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-06-23 23:57:18 +00:00
parent 1191be630f
commit 2f48fae72a
16 changed files with 295 additions and 244 deletions

View File

@ -526,6 +526,9 @@ Trunk (unreleased changes)
HDFS-2092. Remove some object references to Configuration in DFSClient.
(Bharath Mundlapudi via szetszwo)
HDFS-2087. Declare methods in DataTransferProtocol interface, and change
Sender and Receiver to implement the interface. (szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -404,10 +404,9 @@ public static BlockReader newBlockReader( Socket sock, String file,
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
Sender.opReadBlock(
new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
block, startOffset, len, clientName, blockToken);
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT)));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
// Get bytes in block, set streams

View File

@ -1164,7 +1164,7 @@ public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+ Op.BLOCK_CHECKSUM + ", block=" + block);
}
// get block MD5
Sender.opBlockChecksum(out, block, lb.getBlockToken());
new Sender(out).blockChecksum(block, lb.getBlockToken());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));

View File

@ -846,8 +846,8 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
DataNode.SMALL_BUFFER_SIZE));
//send the TRANSFER_BLOCK request
Sender.opTransferBlock(out, block,
dfsClient.clientName, targets, blockToken);
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
targets);
//ack
in = new DataInputStream(NetUtils.getInputStream(sock));
@ -1019,10 +1019,9 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
// send the request
Sender.opWriteBlock(out, block,
nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS,
block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
accessToken);
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS);
checksum.writeHeader(out);
out.flush();

View File

@ -17,10 +17,16 @@
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
/**
* Transfer data to/from datanode using a streaming protocol.
@ -35,8 +41,101 @@ public interface DataTransferProtocol {
* when protocol changes. It is not very obvious.
*/
/*
* Version 27:
* Move DataTransferProtocol and the inner classes to a package.
* Version 28:
* Declare methods in DataTransferProtocol interface.
*/
public static final int DATA_TRANSFER_VERSION = 27;
public static final int DATA_TRANSFER_VERSION = 28;
/**
* Read a block.
*
* @param blk the block being read.
* @param blockToken security token for accessing the block.
* @param clientName client's name.
* @param blockOffset offset of the block.
* @param length maximum number of bytes for this read.
*/
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException;
/**
* Write a block to a datanode pipeline.
*
* @param blk the block being written.
* @param blockToken security token for accessing the block.
* @param clientName client's name.
* @param targets target datanodes in the pipeline.
* @param source source datanode.
* @param stage pipeline stage.
* @param pipelineSize the size of the pipeline.
* @param minBytesRcvd minimum number of bytes received.
* @param maxBytesRcvd maximum number of bytes received.
* @param latestGenerationStamp the latest generation stamp of the block.
*/
public void writeBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException;
/**
* Transfer a block to another datanode.
* The block stage must be
* either {@link BlockConstructionStage#TRANSFER_RBW}
* or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
*
* @param blk the block being transferred.
* @param blockToken security token for accessing the block.
* @param clientName client's name.
* @param targets target datanodes.
*/
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException;
/**
* Receive a block from a source datanode
* and then notifies the namenode
* to remove the copy from the original datanode.
* Note that the source datanode and the original datanode can be different.
* It is used for balancing purpose.
*
* @param blk the block being replaced.
* @param blockToken security token for accessing the block.
* @param delHint the hint for deleting the block in the original datanode.
* @param source the source datanode for receiving the block.
*/
public void replaceBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo source) throws IOException;
/**
* Copy a block.
* It is used for balancing purpose.
*
* @param blk the block being copied.
* @param blockToken security token for accessing the block.
*/
public void copyBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken) throws IOException;
/**
* Get block checksum (MD5 of CRC32).
*
* @param blk a block.
* @param blockToken security token for accessing the block.
* @throws IOException
*/
public void blockChecksum(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken) throws IOException;
}

View File

@ -27,23 +27,26 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
/** Receiver */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Receiver {
public abstract class Receiver implements DataTransferProtocol {
protected final DataInputStream in;
/** Create a receiver for DataTransferProtocol with a socket. */
protected Receiver(final DataInputStream in) {
this.in = in;
}
/** Read an Op. It also checks protocol version. */
protected final Op readOp(DataInputStream in) throws IOException {
protected final Op readOp() throws IOException {
final short version = in.readShort();
if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
throw new IOException( "Version Mismatch (Expected: " +
@ -54,11 +57,10 @@ protected final Op readOp(DataInputStream in) throws IOException {
}
/** Process op by the corresponding method. */
protected final void processOp(Op op, DataInputStream in
) throws IOException {
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock(in);
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
@ -81,121 +83,60 @@ protected final void processOp(Op op, DataInputStream in
}
/** Receive OP_READ_BLOCK */
private void opReadBlock(DataInputStream in) throws IOException {
private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
ExtendedBlock b = fromProto(
proto.getHeader().getBaseHeader().getBlock());
Token<BlockTokenIdentifier> token = fromProto(
proto.getHeader().getBaseHeader().getToken());
opReadBlock(in, b, proto.getOffset(), proto.getLen(),
proto.getHeader().getClientName(), token);
readBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen());
}
/**
* Abstract OP_READ_BLOCK method. Read a block.
*/
protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
long offset, long length, String client,
Token<BlockTokenIdentifier> blockToken) throws IOException;
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
opWriteBlock(in,
fromProto(proto.getHeader().getBaseHeader().getBlock()),
proto.getPipelineSize(),
fromProto(proto.getStage()),
proto.getLatestGenerationStamp(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
writeBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
fromProto(proto.getSource()),
fromProtos(proto.getTargetsList()),
fromProto(proto.getHeader().getBaseHeader().getToken()));
fromProto(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp());
}
/**
* Abstract OP_WRITE_BLOCK method.
* Write a block.
*/
protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
int pipelineSize, BlockConstructionStage stage, long newGs,
long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
throws IOException;
/** Receive {@link Op#TRANSFER_BLOCK} */
private void opTransferBlock(DataInputStream in) throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
opTransferBlock(in,
fromProto(proto.getHeader().getBaseHeader().getBlock()),
transferBlock(fromProto(proto.getHeader().getBaseHeader().getBlock()),
fromProto(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
fromProtos(proto.getTargetsList()),
fromProto(proto.getHeader().getBaseHeader().getToken()));
fromProtos(proto.getTargetsList()));
}
/**
* Abstract {@link Op#TRANSFER_BLOCK} method.
* For {@link BlockConstructionStage#TRANSFER_RBW}
* or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
*/
protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
String client, DatanodeInfo[] targets,
Token<BlockTokenIdentifier> blockToken)
throws IOException;
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
opReplaceBlock(in,
fromProto(proto.getHeader().getBlock()),
replaceBlock(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()),
proto.getDelHint(),
fromProto(proto.getSource()),
fromProto(proto.getHeader().getToken()));
fromProto(proto.getSource()));
}
/**
* Abstract OP_REPLACE_BLOCK method.
* It is used for balancing purpose; send to a destination
*/
protected abstract void opReplaceBlock(DataInputStream in,
ExtendedBlock blk, String delHint, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException;
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
opCopyBlock(in,
fromProto(proto.getHeader().getBlock()),
copyBlock(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
}
/**
* Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
* a proxy source.
*/
protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException;
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
opBlockChecksum(in,
fromProto(proto.getHeader().getBlock()),
blockChecksum(fromProto(proto.getHeader().getBlock()),
fromProto(proto.getHeader().getToken()));
}
/**
* Abstract OP_BLOCK_CHECKSUM method.
* Get the checksum of a block
*/
protected abstract void opBlockChecksum(DataInputStream in,
ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
throws IOException;
}

View File

@ -44,7 +44,14 @@
/** Sender */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class Sender {
public class Sender implements DataTransferProtocol {
private final DataOutputStream out;
/** Create a sender for DataTransferProtocol with a output stream. */
public Sender(final DataOutputStream out) {
this.out = out;
}
/** Initialize a operation. */
private static void op(final DataOutput out, final Op op
) throws IOException {
@ -59,79 +66,85 @@ private static void send(final DataOutputStream out, final Op opcode,
out.flush();
}
/** Send OP_READ_BLOCK */
public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
long blockOffset, long blockLen, String clientName,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
@Override
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(blockLen)
.setLen(length)
.build();
send(out, Op.READ_BLOCK, proto);
}
/** Send OP_WRITE_BLOCK */
public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
int pipelineSize, BlockConstructionStage stage, long newGs,
long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
blockToken);
@Override
public void writeBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.addAllTargets(
toProtos(targets, 1))
.addAllTargets(toProtos(targets, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(newGs);
.setLatestGenerationStamp(latestGenerationStamp);
if (src != null) {
proto.setSource(toProto(src));
if (source != null) {
proto.setSource(toProto(source));
}
send(out, Op.WRITE_BLOCK, proto.build());
}
/** Send {@link Op#TRANSFER_BLOCK} */
public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
String client, DatanodeInfo[] targets,
Token<BlockTokenIdentifier> blockToken) throws IOException {
@Override
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException {
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(
blk, client, blockToken))
blk, clientName, blockToken))
.addAllTargets(toProtos(targets, 0))
.build();
send(out, Op.TRANSFER_BLOCK, proto);
}
/** Send OP_REPLACE_BLOCK */
public static void opReplaceBlock(DataOutputStream out,
ExtendedBlock blk, String delHint, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException {
@Override
public void replaceBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo source) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.setDelHint(delHint)
.setSource(toProto(src))
.setSource(toProto(source))
.build();
send(out, Op.REPLACE_BLOCK, proto);
}
/** Send OP_COPY_BLOCK */
public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
@Override
public void copyBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.build();
@ -139,10 +152,9 @@ public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
send(out, Op.COPY_BLOCK, proto);
}
/** Send OP_BLOCK_CHECKSUM */
public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
@Override
public void blockChecksum(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
.build();

View File

@ -348,8 +348,8 @@ private void dispatch() {
private void sendRequest(DataOutputStream out) throws IOException {
final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
Sender.opReplaceBlock(out, eb, source.getStorageID(),
proxySource.getDatanode(), accessToken);
new Sender(out).replaceBlock(eb, accessToken,
source.getStorageID(), proxySource.getDatanode());
}
/* Receive a block copy response from the input stream */

View File

@ -1977,8 +1977,8 @@ public void run() {
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
Sender.opWriteBlock(out,
b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
stage, 0, 0, 0, 0);
// send data & checksum
blockSender.sendBlock(out, baseStream, null);

View File

@ -85,7 +85,10 @@ class DataXceiver extends Receiver implements Runnable, FSConstants {
private long opStartTime; //the start time of receiving an Op
public DataXceiver(Socket s, DataNode datanode,
DataXceiverServer dataXceiverServer) {
DataXceiverServer dataXceiverServer) throws IOException {
super(new DataInputStream(new BufferedInputStream(
NetUtils.getInputStream(s), FSConstants.SMALL_BUFFER_SIZE)));
this.s = s;
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
@ -127,13 +130,9 @@ private void updateCurrentThreadName(String status) {
public void run() {
updateCurrentThreadName("Waiting for operation");
DataInputStream in=null;
int opsProcessed = 0;
Op op = null;
try {
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
int stdTimeout = s.getSoTimeout();
// We process requests in a loop, and stay around for a short timeout.
@ -145,7 +144,7 @@ public void run() {
assert socketKeepaliveTimeout > 0;
s.setSoTimeout(socketKeepaliveTimeout);
}
op = readOp(in);
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
@ -176,7 +175,7 @@ public void run() {
}
opStartTime = now();
processOp(op, in);
processOp(op);
++opsProcessed;
} while (!s.isClosed() && socketKeepaliveTimeout > 0);
} catch (Throwable t) {
@ -196,13 +195,12 @@ public void run() {
}
}
/**
* Read a block from the disk.
*/
@Override
protected void opReadBlock(DataInputStream in, ExtendedBlock block,
long startOffset, long length, String clientName,
Token<BlockTokenIdentifier> blockToken) throws IOException {
public void readBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException {
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
@ -225,7 +223,7 @@ protected void opReadBlock(DataInputStream in, ExtendedBlock block,
updateCurrentThreadName("Sending block " + block);
try {
try {
blockSender = new BlockSender(block, startOffset, length,
blockSender = new BlockSender(block, blockOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
LOG.info("opReadBlock " + block + " received exception " + e);
@ -284,16 +282,17 @@ protected void opReadBlock(DataInputStream in, ExtendedBlock block,
datanode.metrics.incrReadsFromClient(isLocal);
}
/**
* Write a block to disk.
*/
@Override
protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block,
final int pipelineSize, final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DatanodeInfo[] targets, final Token<BlockTokenIdentifier> blockToken
) throws IOException {
public void writeBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp) throws IOException {
updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
@ -308,7 +307,7 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block,
if (LOG.isDebugEnabled()) {
LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
+ "\n block =" + block + ", newGs=" + newGs
+ "\n block =" + block + ", newGs=" + latestGenerationStamp
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ "\n targets=" + Arrays.asList(targets)
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
@ -351,10 +350,10 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block,
blockReceiver = new BlockReceiver(block, in,
s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(),
stage, newGs, minBytesRcvd, maxBytesRcvd,
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode);
} else {
datanode.data.recoverClose(block, newGs, minBytesRcvd);
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
//
@ -380,9 +379,9 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block,
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
Sender.opWriteBlock(mirrorOut, originalBlock,
pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
srcDataNode, targets, blockToken);
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
if (blockReceiver != null) { // send checksum header
blockReceiver.writeChecksumHeader(mirrorOut);
@ -464,7 +463,7 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block,
// update its generation stamp
if (isClient &&
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
block.setGenerationStamp(newGs);
block.setGenerationStamp(latestGenerationStamp);
block.setNumBytes(minBytesRcvd);
}
@ -499,10 +498,10 @@ protected void opWriteBlock(final DataInputStream in, final ExtendedBlock block,
}
@Override
protected void opTransferBlock(final DataInputStream in,
final ExtendedBlock blk, final String client,
final DatanodeInfo[] targets,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException {
checkAccess(null, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
@ -511,19 +510,16 @@ protected void opTransferBlock(final DataInputStream in,
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, client);
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
writeResponse(Status.SUCCESS, out);
} finally {
IOUtils.closeStream(out);
}
}
/**
* Get block checksum (MD5 of CRC32).
*/
@Override
protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken) throws IOException {
public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
checkAccess(out, true, block, blockToken,
@ -572,12 +568,9 @@ protected void opBlockChecksum(DataInputStream in, ExtendedBlock block,
datanode.metrics.addBlockChecksumOp(elapsed());
}
/**
* Read a block from the disk and then sends it to a destination.
*/
@Override
protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken) throws IOException {
public void copyBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Copying block " + block);
// Read in the header
if (datanode.isBlockTokenEnabled) {
@ -647,15 +640,12 @@ protected void opCopyBlock(DataInputStream in, ExtendedBlock block,
datanode.metrics.addCopyBlockOp(elapsed());
}
/**
* Receive a block and write it to disk, it then notifies the namenode to
* remove the copy from the source.
*/
@Override
protected void opReplaceBlock(DataInputStream in,
ExtendedBlock block, String sourceID, DatanodeInfo proxySource,
Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Replacing block " + block + " from " + sourceID);
public void replaceBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo proxySource) throws IOException {
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
/* read header */
block.setNumBytes(dataXceiverServer.estimateBlockSize);
@ -699,7 +689,7 @@ protected void opReplaceBlock(DataInputStream in,
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
/* send request to the proxy */
Sender.opCopyBlock(proxyOut, block, blockToken);
new Sender(proxyOut).copyBlock(block, blockToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
@ -727,7 +717,7 @@ protected void opReplaceBlock(DataInputStream in,
dataXceiverServer.balanceThrottler, null);
// notify name node
datanode.notifyNamenodeReceivedBlock(block, sourceID);
datanode.notifyNamenodeReceivedBlock(block, delHint);
LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());

View File

@ -28,12 +28,13 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
@ -128,15 +129,20 @@ synchronized void release() {
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
}
/**
*/
@Override
public void run() {
while (datanode.shouldRun) {
try {
Socket s = ss.accept();
s.setTcpNoDelay(true);
new Daemon(datanode.threadGroup,
new DataXceiver(s, datanode, this)).start();
final DataXceiver exciver;
try {
exciver = new DataXceiver(s, datanode, this);
} catch(IOException e) {
IOUtils.closeSocket(s);
throw e;
}
new Daemon(datanode.threadGroup, exciver).start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (IOException ie) {

View File

@ -46,7 +46,7 @@ public aspect DataTransferProtocolAspects {
*/
pointcut receiverOp(DataXceiver dataxceiver):
call(Op Receiver.readOp(DataInputStream)) && target(dataxceiver);
call(Op Receiver.readOp()) && target(dataxceiver);
after(DataXceiver dataxceiver) returning(Op op): receiverOp(dataxceiver) {
LOG.info("FI: receiverOp " + op + ", datanode="

View File

@ -683,8 +683,8 @@ public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
// send the request
Sender.opTransferBlock(out, b, dfsClient.clientName,
new DatanodeInfo[]{datanodes[1]}, new Token<BlockTokenIdentifier>());
new Sender(out).transferBlock(b, new Token<BlockTokenIdentifier>(),
dfsClient.clientName, new DatanodeInfo[]{datanodes[1]});
out.flush();
return BlockOpResponseProto.parseDelimitedFrom(in);

View File

@ -72,7 +72,8 @@ public class TestDataTransferProtocol extends TestCase {
DatanodeID datanode;
InetSocketAddress dnAddr;
ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
DataOutputStream sendOut = new DataOutputStream(sendBuf);
final DataOutputStream sendOut = new DataOutputStream(sendBuf);
final Sender sender = new Sender(sendOut);
ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
DataOutputStream recvOut = new DataOutputStream(recvBuf);
@ -185,9 +186,9 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
recvBuf.reset();
Sender.opWriteBlock(sendOut, block, 0,
stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS);
if (eofExcepted) {
sendResponse(Status.ERROR, null, recvOut);
sendRecvData(description, true);
@ -372,10 +373,11 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
/* Test OP_WRITE_BLOCK */
sendBuf.reset();
Sender.opWriteBlock(sendOut,
new ExtendedBlock(poolId, newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sender.writeBlock(new ExtendedBlock(poolId, newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE,
0, 0L, 0L, 0L);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
// bad bytes per checksum
@ -386,10 +388,10 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
sendBuf.reset();
recvBuf.reset();
Sender.opWriteBlock(sendOut,
new ExtendedBlock(poolId, ++newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512);
@ -409,10 +411,10 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
// test for writing a valid zero size block
sendBuf.reset();
recvBuf.reset();
Sender.opWriteBlock(sendOut,
new ExtendedBlock(poolId, ++newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
sender.writeBlock(new ExtendedBlock(poolId, ++newBlockId),
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
@ -439,22 +441,22 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
sendBuf.reset();
recvBuf.reset();
blk.setBlockId(blkid-1);
Sender.opReadBlock(sendOut, blk, 0L, fileLen, "cl",
BlockTokenSecretManager.DUMMY_TOKEN);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen);
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset -1L
sendBuf.reset();
blk.setBlockId(blkid);
Sender.opReadBlock(sendOut, blk, -1L, fileLen, "cl",
BlockTokenSecretManager.DUMMY_TOKEN);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-1L, fileLen);
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
// bad block start offset
sendBuf.reset();
Sender.opReadBlock(sendOut, blk, fileLen, fileLen, "cl",
BlockTokenSecretManager.DUMMY_TOKEN);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
fileLen, fileLen);
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@ -462,8 +464,8 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
recvBuf.reset();
sendResponse(Status.SUCCESS, null, recvOut);
sendBuf.reset();
Sender.opReadBlock(sendOut, blk, 0L,
-1 - random.nextInt(oneMil), "cl", BlockTokenSecretManager.DUMMY_TOKEN);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil));
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@ -471,15 +473,15 @@ private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long n
recvBuf.reset();
sendResponse(Status.ERROR, null, recvOut);
sendBuf.reset();
Sender.opReadBlock(sendOut, blk, 0L,
fileLen + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen+1);
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
//At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset();
Sender.opReadBlock(sendOut, blk, 0L,
fileLen, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen);
readFile(fileSys, file, fileLen);
} finally {
cluster.shutdown();

View File

@ -258,8 +258,8 @@ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
Sender.opReplaceBlock(out, block, source
.getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
new Sender(out).replaceBlock(block, BlockTokenSecretManager.DUMMY_TOKEN,
source.getStorageID(), sourceProxy);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());

View File

@ -140,10 +140,10 @@ public void testReplicationError() throws Exception {
// write the header.
DataOutputStream out = new DataOutputStream(s.getOutputStream());
Sender.opWriteBlock(out, block.getBlock(), 1,
BlockConstructionStage.PIPELINE_SETUP_CREATE,
0L, 0L, 0L, "", null, new DatanodeInfo[0],
BlockTokenSecretManager.DUMMY_TOKEN);
new Sender(out).writeBlock(block.getBlock(),
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L);
// write check header
out.writeByte( 1 );