HDFS-2496. Separate datatypes for DatanodeProtocol. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1189901 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
724f217343
commit
456ca74b96
@ -51,6 +51,8 @@ Trunk (unreleased changes)
|
||||
|
||||
HDFS-2488. Separate datatypes for InterDatanodeProtocol. (suresh)
|
||||
|
||||
HDFS-2496. Separate datatypes for DatanodeProtocol. (suresh)
|
||||
|
||||
BUG FIXES
|
||||
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
|
||||
|
||||
|
@ -37,9 +37,6 @@
|
||||
serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
|
||||
@TokenInfo(BlockTokenSelector.class)
|
||||
public interface ClientDatanodeProtocol extends VersionedProtocol {
|
||||
public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
|
||||
|
||||
|
||||
/**
|
||||
* Until version 9, this class ClientDatanodeProtocol served as both
|
||||
* the client interface to the DN AND the RPC protocol used to
|
||||
|
@ -92,4 +92,20 @@ public long getNumBytes() {
|
||||
public long getGenerationStamp() {
|
||||
return generationStamp;
|
||||
}
|
||||
|
||||
public static Block[] convert(BlockWritable[] blocks) {
|
||||
Block[] ret = new Block[blocks.length];
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
ret[i] = blocks[i].convert();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static BlockWritable[] convert(Block[] blocks) {
|
||||
BlockWritable[] ret = new BlockWritable[blocks.length];
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
ret[i] = BlockWritable.convert(blocks[i]);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -67,7 +67,6 @@ public BlockCommand() {}
|
||||
public BlockCommand(int action, String poolId,
|
||||
List<BlockTargetPair> blocktargetlist) {
|
||||
super(action);
|
||||
|
||||
this.poolId = poolId;
|
||||
blocks = new Block[blocktargetlist.size()];
|
||||
targets = new DatanodeInfo[blocks.length][];
|
||||
@ -85,12 +84,21 @@ public BlockCommand(int action, String poolId,
|
||||
* @param blocks blocks related to the action
|
||||
*/
|
||||
public BlockCommand(int action, String poolId, Block blocks[]) {
|
||||
this(action, poolId, blocks, EMPTY_TARGET);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create BlockCommand for the given action
|
||||
* @param blocks blocks related to the action
|
||||
*/
|
||||
public BlockCommand(int action, String poolId, Block[] blocks,
|
||||
DatanodeInfo[][] targets) {
|
||||
super(action);
|
||||
this.poolId = poolId;
|
||||
this.blocks = blocks;
|
||||
this.targets = EMPTY_TARGET;
|
||||
this.targets = targets;
|
||||
}
|
||||
|
||||
|
||||
public String getBlockPoolId() {
|
||||
return poolId;
|
||||
}
|
||||
|
@ -117,8 +117,12 @@ public BlockRecoveryCommand() {
|
||||
* the specified capacity for recovering blocks.
|
||||
*/
|
||||
public BlockRecoveryCommand(int capacity) {
|
||||
this(new ArrayList<RecoveringBlock>(capacity));
|
||||
}
|
||||
|
||||
public BlockRecoveryCommand(Collection<RecoveringBlock> blocks) {
|
||||
super(DatanodeProtocol.DNA_RECOVERBLOCK);
|
||||
recoveringBlocks = new ArrayList<RecoveringBlock>(capacity);
|
||||
recoveringBlocks = blocks;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -22,10 +22,11 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocolR23Compatible.DatanodeWireProtocol;
|
||||
import org.apache.hadoop.ipc.VersionedProtocol;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
|
||||
@ -45,7 +46,14 @@
|
||||
@InterfaceAudience.Private
|
||||
public interface DatanodeProtocol extends VersionedProtocol {
|
||||
/**
|
||||
* 28: Add Balancer Bandwidth Command protocol.
|
||||
* This class is used by both the Namenode (client) and BackupNode (server)
|
||||
* to insulate from the protocol serialization.
|
||||
*
|
||||
* If you are adding/changing DN's interface then you need to
|
||||
* change both this class and ALSO
|
||||
* {@link DatanodeWireProtocol}.
|
||||
* These changes need to be done in a compatible fashion as described in
|
||||
* {@link ClientNamenodeWireProtocol}
|
||||
*/
|
||||
public static final long versionID = 28L;
|
||||
|
||||
|
@ -63,9 +63,14 @@ public DatanodeRegistration() {
|
||||
* Create DatanodeRegistration
|
||||
*/
|
||||
public DatanodeRegistration(String nodeName) {
|
||||
this(nodeName, new StorageInfo(), new ExportedBlockKeys());
|
||||
}
|
||||
|
||||
public DatanodeRegistration(String nodeName, StorageInfo info,
|
||||
ExportedBlockKeys keys) {
|
||||
super(nodeName);
|
||||
this.storageInfo = new StorageInfo();
|
||||
this.exportedKeys = new ExportedBlockKeys();
|
||||
this.storageInfo = info;
|
||||
this.exportedKeys = keys;
|
||||
}
|
||||
|
||||
public void setStorageInfo(StorageInfo storage) {
|
||||
|
@ -0,0 +1,107 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.io.WritableFactory;
|
||||
|
||||
/**
|
||||
* Balancer bandwidth command instructs each datanode to change its value for
|
||||
* the max amount of network bandwidth it may use during the block balancing
|
||||
* operation.
|
||||
*
|
||||
* The Balancer Bandwidth Command contains the new bandwidth value as its
|
||||
* payload. The bandwidth value is in bytes per second.
|
||||
*/
|
||||
public class BalancerBandwidthCommandWritable extends DatanodeCommandWritable {
|
||||
private final static long BBC_DEFAULTBANDWIDTH = 0L;
|
||||
|
||||
private long bandwidth;
|
||||
|
||||
/**
|
||||
* Balancer Bandwidth Command constructor. Sets bandwidth to 0.
|
||||
*/
|
||||
BalancerBandwidthCommandWritable() {
|
||||
this(BBC_DEFAULTBANDWIDTH);
|
||||
}
|
||||
|
||||
/**
|
||||
* Balancer Bandwidth Command constructor.
|
||||
* @param bandwidth Blanacer bandwidth in bytes per second.
|
||||
*/
|
||||
public BalancerBandwidthCommandWritable(long bandwidth) {
|
||||
super(DatanodeWireProtocol.DNA_BALANCERBANDWIDTHUPDATE);
|
||||
this.bandwidth = bandwidth;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current value of the max balancer bandwidth in bytes per second.
|
||||
* @return bandwidth Blanacer bandwidth in bytes per second for this datanode.
|
||||
*/
|
||||
public long getBalancerBandwidthValue() {
|
||||
return this.bandwidth;
|
||||
}
|
||||
|
||||
// ///////////////////////////////////////////////
|
||||
// Writable
|
||||
// ///////////////////////////////////////////////
|
||||
static { // register a ctor
|
||||
WritableFactories.setFactory(BalancerBandwidthCommandWritable.class,
|
||||
new WritableFactory() {
|
||||
public Writable newInstance() {
|
||||
return new BalancerBandwidthCommandWritable();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the bandwidth payload to the Balancer Bandwidth Command packet.
|
||||
* @param out DataOutput stream used for writing commands to the datanode.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
out.writeLong(this.bandwidth);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the bandwidth payload from the Balancer Bandwidth Command packet.
|
||||
* @param in DataInput stream used for reading commands to the datanode.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
this.bandwidth = in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommand convert() {
|
||||
return new BalancerBandwidthCommand(bandwidth);
|
||||
}
|
||||
|
||||
public static DatanodeCommandWritable convert(BalancerBandwidthCommand cmd) {
|
||||
return new BalancerBandwidthCommandWritable(cmd.getBalancerBandwidthValue());
|
||||
}
|
||||
}
|
@ -0,0 +1,147 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.BlockWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeInfoWritable;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.io.WritableFactory;
|
||||
|
||||
/****************************************************
|
||||
* A BlockCommand is an instruction to a datanode regarding some blocks under
|
||||
* its control. It tells the DataNode to either invalidate a set of indicated
|
||||
* blocks, or to copy a set of indicated blocks to another DataNode.
|
||||
*
|
||||
****************************************************/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class BlockCommandWritable extends DatanodeCommandWritable {
|
||||
|
||||
/**
|
||||
* This constant is used to indicate that the block deletion does not need
|
||||
* explicit ACK from the datanode. When a block is put into the list of blocks
|
||||
* to be deleted, it's size is set to this constant. We assume that no block
|
||||
* would actually have this size. Otherwise, we would miss ACKs for blocks
|
||||
* with such size. Positive number is used for compatibility reasons.
|
||||
*/
|
||||
public static final long NO_ACK = Long.MAX_VALUE;
|
||||
|
||||
String poolId;
|
||||
BlockWritable blocks[];
|
||||
DatanodeInfoWritable targets[][];
|
||||
|
||||
public BlockCommandWritable() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create BlockCommand for the given action
|
||||
*
|
||||
* @param blocks blocks related to the action
|
||||
*/
|
||||
public BlockCommandWritable(int action, String poolId, BlockWritable[] blocks,
|
||||
DatanodeInfoWritable[][] targets) {
|
||||
super(action);
|
||||
this.poolId = poolId;
|
||||
this.blocks = blocks;
|
||||
this.targets = targets;
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////
|
||||
// Writable
|
||||
// /////////////////////////////////////////
|
||||
static { // register a ctor
|
||||
WritableFactories.setFactory(BlockCommandWritable.class,
|
||||
new WritableFactory() {
|
||||
public Writable newInstance() {
|
||||
return new BlockCommandWritable();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
Text.writeString(out, poolId);
|
||||
out.writeInt(blocks.length);
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
blocks[i].write(out);
|
||||
}
|
||||
out.writeInt(targets.length);
|
||||
for (int i = 0; i < targets.length; i++) {
|
||||
out.writeInt(targets[i].length);
|
||||
for (int j = 0; j < targets[i].length; j++) {
|
||||
targets[i][j].write(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
this.poolId = Text.readString(in);
|
||||
this.blocks = new BlockWritable[in.readInt()];
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
blocks[i] = new BlockWritable();
|
||||
blocks[i].readFields(in);
|
||||
}
|
||||
|
||||
this.targets = new DatanodeInfoWritable[in.readInt()][];
|
||||
for (int i = 0; i < targets.length; i++) {
|
||||
this.targets[i] = new DatanodeInfoWritable[in.readInt()];
|
||||
for (int j = 0; j < targets[i].length; j++) {
|
||||
targets[i][j] = new DatanodeInfoWritable();
|
||||
targets[i][j].readFields(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockCommand convert() {
|
||||
DatanodeInfo[][] dinfo = new DatanodeInfo[targets.length][];
|
||||
for (int i = 0; i < targets.length; i++) {
|
||||
dinfo[i] = DatanodeInfoWritable.convertDatanodeInfo(targets[i]);
|
||||
}
|
||||
return new BlockCommand(getAction(), poolId, BlockWritable.convert(blocks),
|
||||
dinfo);
|
||||
}
|
||||
|
||||
public static BlockCommandWritable convert(BlockCommand cmd) {
|
||||
if (cmd == null) return null;
|
||||
DatanodeInfo[][] targets = cmd.getTargets();
|
||||
DatanodeInfoWritable[][] dinfo = new DatanodeInfoWritable[targets.length][];
|
||||
for (int i = 0; i < targets.length; i++) {
|
||||
dinfo[i] = DatanodeInfoWritable.convertDatanodeInfo(targets[i]);
|
||||
}
|
||||
return new BlockCommandWritable(cmd.getAction(), cmd.getBlockPoolId(),
|
||||
BlockWritable.convert(cmd.getBlocks()), dinfo);
|
||||
}
|
||||
}
|
@ -0,0 +1,118 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.io.WritableFactory;
|
||||
|
||||
/**
|
||||
* BlockRecoveryCommand is an instruction to a data-node to recover the
|
||||
* specified blocks.
|
||||
*
|
||||
* The data-node that receives this command treats itself as a primary data-node
|
||||
* in the recover process.
|
||||
*
|
||||
* Block recovery is identified by a recoveryId, which is also the new
|
||||
* generation stamp, which the block will have after the recovery succeeds.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class BlockRecoveryCommandWritable extends DatanodeCommandWritable {
|
||||
Collection<RecoveringBlockWritable> recoveringBlocks;
|
||||
|
||||
/**
|
||||
* Create empty BlockRecoveryCommand.
|
||||
*/
|
||||
public BlockRecoveryCommandWritable() { }
|
||||
|
||||
/**
|
||||
* Create BlockRecoveryCommand with the specified capacity for recovering
|
||||
* blocks.
|
||||
*/
|
||||
public BlockRecoveryCommandWritable(int capacity) {
|
||||
this(new ArrayList<RecoveringBlockWritable>(capacity));
|
||||
}
|
||||
|
||||
public BlockRecoveryCommandWritable(Collection<RecoveringBlockWritable> blocks) {
|
||||
super(DatanodeWireProtocol.DNA_RECOVERBLOCK);
|
||||
recoveringBlocks = blocks;
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////
|
||||
// Writable
|
||||
// /////////////////////////////////////////
|
||||
static { // register a ctor
|
||||
WritableFactories.setFactory(BlockRecoveryCommandWritable.class,
|
||||
new WritableFactory() {
|
||||
public Writable newInstance() {
|
||||
return new BlockRecoveryCommandWritable();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
out.writeInt(recoveringBlocks.size());
|
||||
for (RecoveringBlockWritable block : recoveringBlocks) {
|
||||
block.write(out);
|
||||
}
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
int numBlocks = in.readInt();
|
||||
recoveringBlocks = new ArrayList<RecoveringBlockWritable>(numBlocks);
|
||||
for (int i = 0; i < numBlocks; i++) {
|
||||
RecoveringBlockWritable b = new RecoveringBlockWritable();
|
||||
b.readFields(in);
|
||||
recoveringBlocks.add(b);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommand convert() {
|
||||
Collection<RecoveringBlock> blks =
|
||||
new ArrayList<RecoveringBlock>(recoveringBlocks.size());
|
||||
for (RecoveringBlockWritable b : recoveringBlocks) {
|
||||
blks.add(b.convert());
|
||||
}
|
||||
return new BlockRecoveryCommand(blks);
|
||||
}
|
||||
|
||||
public static BlockRecoveryCommandWritable convert(BlockRecoveryCommand cmd) {
|
||||
if (cmd == null) return null;
|
||||
Collection<RecoveringBlockWritable> blks =
|
||||
new ArrayList<RecoveringBlockWritable>(cmd.getRecoveringBlocks().size());
|
||||
for (RecoveringBlock b : cmd.getRecoveringBlocks()) {
|
||||
blks.add(RecoveringBlockWritable.convert(b));
|
||||
}
|
||||
return new BlockRecoveryCommandWritable(blks);
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||
|
||||
/**
|
||||
* Class for translating DatanodeCommandWritable to and from DatanodeCommand.
|
||||
*/
|
||||
class DatanodeCommandHelper {
|
||||
public static final Log LOG = LogFactory.getLog(DatanodeCommandHelper.class);
|
||||
|
||||
private DatanodeCommandHelper() {
|
||||
/* Private constructor to prevent instantiation */
|
||||
}
|
||||
|
||||
static DatanodeCommand convert(DatanodeCommandWritable cmd) {
|
||||
return cmd.convert();
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a subclass of {@link DatanodeCommand} return the corresponding
|
||||
* writable type.
|
||||
*/
|
||||
static DatanodeCommandWritable convert(DatanodeCommand cmd) {
|
||||
switch (cmd.getAction()) {
|
||||
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
|
||||
return BalancerBandwidthCommandWritable
|
||||
.convert((BalancerBandwidthCommand) cmd);
|
||||
|
||||
case DatanodeProtocol.DNA_FINALIZE:
|
||||
return FinalizeCommandWritable.convert((FinalizeCommand)cmd);
|
||||
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
||||
return KeyUpdateCommandWritable.convert((KeyUpdateCommand)cmd);
|
||||
case DatanodeProtocol.DNA_REGISTER:
|
||||
return RegisterCommandWritable.REGISTER;
|
||||
case DatanodeProtocol.DNA_TRANSFER:
|
||||
case DatanodeProtocol.DNA_INVALIDATE:
|
||||
return BlockCommandWritable.convert((BlockCommand)cmd);
|
||||
case UpgradeCommand.UC_ACTION_START_UPGRADE:
|
||||
return UpgradeCommandWritable.convert((UpgradeCommand)cmd);
|
||||
case DatanodeProtocol.DNA_RECOVERBLOCK:
|
||||
return BlockRecoveryCommandWritable.convert((BlockRecoveryCommand)cmd);
|
||||
default:
|
||||
LOG.warn("Unknown DatanodeCommand action - " + cmd.getAction());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
|
||||
/**
|
||||
* Base class for data-node command.
|
||||
* Issued by the name-node to notify data-nodes what should be done.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class DatanodeCommandWritable extends ServerCommandWritable {
|
||||
public DatanodeCommandWritable() {
|
||||
super();
|
||||
}
|
||||
|
||||
DatanodeCommandWritable(int action) {
|
||||
super(action);
|
||||
}
|
||||
|
||||
/** Method to convert from writable type to internal type */
|
||||
public abstract DatanodeCommand convert();
|
||||
|
||||
public static DatanodeCommandWritable[] convert(DatanodeCommand[] cmds) {
|
||||
DatanodeCommandWritable[] ret = new DatanodeCommandWritable[cmds.length];
|
||||
for (int i = 0; i < cmds.length; i++) {
|
||||
ret[i] = DatanodeCommandHelper.convert(cmds[i]);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static DatanodeCommand[] convert(DatanodeCommandWritable[] cmds) {
|
||||
if (cmds == null) return null;
|
||||
DatanodeCommand[] ret = new DatanodeCommand[cmds.length];
|
||||
for (int i = 0; i < cmds.length; i++) {
|
||||
ret[i] = cmds[i].convert();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
@ -0,0 +1,170 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeIDWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.LocatedBlockWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.NamespaceInfoWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
||||
/**
|
||||
* This class is used on the server side. Calls come across the wire for the
|
||||
* protocol family of Release 23 onwards. This class translates the R23 data
|
||||
* types to the native data types used inside the NN as specified in the generic
|
||||
* DatanodeProtocol.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
public class DatanodeProtocolServerSideTranslatorR23 implements
|
||||
DatanodeWireProtocol {
|
||||
final private DatanodeProtocol server;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param server - the NN server
|
||||
* @throws IOException
|
||||
*/
|
||||
public DatanodeProtocolServerSideTranslatorR23(DatanodeProtocol server)
|
||||
throws IOException {
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
/**
|
||||
* The client side will redirect getProtocolSignature to
|
||||
* getProtocolSignature2.
|
||||
*
|
||||
* However the RPC layer below on the Server side will call getProtocolVersion
|
||||
* and possibly in the future getProtocolSignature. Hence we still implement
|
||||
* it even though the end client's call will never reach here.
|
||||
*/
|
||||
@Override
|
||||
public ProtocolSignature getProtocolSignature(String protocol,
|
||||
long clientVersion, int clientMethodsHash) throws IOException {
|
||||
/**
|
||||
* Don't forward this to the server. The protocol version and signature is
|
||||
* that of {@link DatanodeProtocol}
|
||||
*
|
||||
*/
|
||||
if (!protocol.equals(RPC.getProtocolName(DatanodeWireProtocol.class))) {
|
||||
throw new IOException("Namenode Serverside implements " +
|
||||
RPC.getProtocolName(DatanodeWireProtocol.class) +
|
||||
". The following requested protocol is unknown: " + protocol);
|
||||
}
|
||||
|
||||
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
|
||||
DatanodeWireProtocol.versionID, DatanodeWireProtocol.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProtocolSignatureWritable
|
||||
getProtocolSignature2(
|
||||
String protocol, long clientVersion, int clientMethodsHash)
|
||||
throws IOException {
|
||||
/**
|
||||
* Don't forward this to the server. The protocol version and signature is
|
||||
* that of {@link DatanodeProtocol}
|
||||
*/
|
||||
return ProtocolSignatureWritable.convert(
|
||||
this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProtocolVersion(String protocol, long clientVersion)
|
||||
throws IOException {
|
||||
if (protocol.equals(RPC.getProtocolName(DatanodeWireProtocol.class))) {
|
||||
return DatanodeWireProtocol.versionID;
|
||||
}
|
||||
throw new IOException("Namenode Serverside implements " +
|
||||
RPC.getProtocolName(DatanodeWireProtocol.class) +
|
||||
". The following requested protocol is unknown: " + protocol);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeRegistrationWritable registerDatanode(
|
||||
DatanodeRegistrationWritable registration) throws IOException {
|
||||
return DatanodeRegistrationWritable.convert(server
|
||||
.registerDatanode(registration.convert()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommandWritable[] sendHeartbeat(
|
||||
DatanodeRegistrationWritable registration, long capacity, long dfsUsed,
|
||||
long remaining, long blockPoolUsed, int xmitsInProgress,
|
||||
int xceiverCount, int failedVolumes) throws IOException {
|
||||
return DatanodeCommandWritable.convert(server.sendHeartbeat(
|
||||
registration.convert(), capacity, dfsUsed, remaining, blockPoolUsed,
|
||||
xmitsInProgress, xceiverCount, failedVolumes));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommandWritable blockReport(
|
||||
DatanodeRegistrationWritable registration, String poolId, long[] blocks)
|
||||
throws IOException {
|
||||
return DatanodeCommandHelper.convert(server.blockReport(
|
||||
registration.convert(), poolId, blocks));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockReceivedAndDeleted(
|
||||
DatanodeRegistrationWritable registration, String poolId,
|
||||
ReceivedDeletedBlockInfoWritable[] receivedAndDeletedBlocks)
|
||||
throws IOException {
|
||||
server.blockReceivedAndDeleted(registration.convert(), poolId,
|
||||
ReceivedDeletedBlockInfoWritable.convert(receivedAndDeletedBlocks));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void errorReport(DatanodeRegistrationWritable registration,
|
||||
int errorCode, String msg) throws IOException {
|
||||
server.errorReport(registration.convert(), errorCode, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamespaceInfoWritable versionRequest() throws IOException {
|
||||
return NamespaceInfoWritable.convert(server.versionRequest());
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpgradeCommandWritable processUpgradeCommand(
|
||||
UpgradeCommandWritable comm) throws IOException {
|
||||
return UpgradeCommandWritable.convert(server.processUpgradeCommand(comm.convert()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reportBadBlocks(LocatedBlockWritable[] blocks) throws IOException {
|
||||
server.reportBadBlocks(LocatedBlockWritable.convertLocatedBlock(blocks));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitBlockSynchronization(ExtendedBlockWritable block,
|
||||
long newgenerationstamp, long newlength, boolean closeFile,
|
||||
boolean deleteblock, DatanodeIDWritable[] newtargets) throws IOException {
|
||||
server.commitBlockSynchronization(
|
||||
ExtendedBlockWritable.convertExtendedBlock(block), newgenerationstamp,
|
||||
newlength, closeFile, deleteblock,
|
||||
DatanodeIDWritable.convertDatanodeID(newtargets));
|
||||
}
|
||||
}
|
@ -0,0 +1,193 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeIDWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.LocatedBlockWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
|
||||
* while translating from the parameter types used in ClientProtocol to those
|
||||
* used in protocolR23Compatile.*.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
public class DatanodeProtocolTranslatorR23 implements
|
||||
DatanodeProtocol, Closeable {
|
||||
final private DatanodeWireProtocol rpcProxy;
|
||||
|
||||
private static DatanodeWireProtocol createNamenode(
|
||||
InetSocketAddress nameNodeAddr, Configuration conf,
|
||||
UserGroupInformation ugi) throws IOException {
|
||||
return RPC.getProxy(DatanodeWireProtocol.class,
|
||||
DatanodeWireProtocol.versionID, nameNodeAddr, ugi, conf,
|
||||
NetUtils.getSocketFactory(conf, DatanodeWireProtocol.class));
|
||||
}
|
||||
|
||||
/** Create a {@link NameNode} proxy */
|
||||
static DatanodeWireProtocol createNamenodeWithRetry(
|
||||
DatanodeWireProtocol rpcNamenode) {
|
||||
RetryPolicy createPolicy = RetryPolicies
|
||||
.retryUpToMaximumCountWithFixedSleep(5,
|
||||
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|
||||
|
||||
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap =
|
||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
|
||||
createPolicy);
|
||||
|
||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
|
||||
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||
remoteExceptionToPolicyMap));
|
||||
RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
||||
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
||||
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
|
||||
|
||||
methodNameToPolicyMap.put("create", methodPolicy);
|
||||
|
||||
return (DatanodeWireProtocol) RetryProxy.create(
|
||||
DatanodeWireProtocol.class, rpcNamenode, methodNameToPolicyMap);
|
||||
}
|
||||
|
||||
public DatanodeProtocolTranslatorR23(InetSocketAddress nameNodeAddr,
|
||||
Configuration conf) throws IOException {
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
RPC.stopProxy(rpcProxy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProtocolVersion(String protocolName, long clientVersion)
|
||||
throws IOException {
|
||||
return rpcProxy.getProtocolVersion(protocolName, clientVersion);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProtocolSignature getProtocolSignature(String protocol,
|
||||
long clientVersion, int clientMethodsHash) throws IOException {
|
||||
return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
|
||||
protocol, clientVersion, clientMethodsHash));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeRegistration registerDatanode(DatanodeRegistration registration)
|
||||
throws IOException {
|
||||
return rpcProxy.registerDatanode(
|
||||
DatanodeRegistrationWritable.convert(registration)).convert();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
|
||||
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
|
||||
int xmitsInProgress, int xceiverCount, int failedVolumes)
|
||||
throws IOException {
|
||||
return DatanodeCommandWritable.convert(rpcProxy.sendHeartbeat(
|
||||
DatanodeRegistrationWritable.convert(registration), capacity,
|
||||
dfsUsed, remaining, blockPoolUsed, xmitsInProgress, xceiverCount,
|
||||
failedVolumes));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommand blockReport(DatanodeRegistration registration,
|
||||
String poolId, long[] blocks) throws IOException {
|
||||
return rpcProxy.blockReport(
|
||||
DatanodeRegistrationWritable.convert(registration), poolId, blocks)
|
||||
.convert();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockReceivedAndDeleted(DatanodeRegistration registration,
|
||||
String poolId, ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
|
||||
throws IOException {
|
||||
rpcProxy.blockReceivedAndDeleted(
|
||||
DatanodeRegistrationWritable.convert(registration), poolId,
|
||||
ReceivedDeletedBlockInfoWritable.convert(receivedAndDeletedBlocks));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void errorReport(DatanodeRegistration registration, int errorCode,
|
||||
String msg) throws IOException {
|
||||
rpcProxy.errorReport(DatanodeRegistrationWritable.convert(registration),
|
||||
errorCode, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamespaceInfo versionRequest() throws IOException {
|
||||
return rpcProxy.versionRequest().convert();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpgradeCommand processUpgradeCommand(UpgradeCommand cmd)
|
||||
throws IOException {
|
||||
return rpcProxy.processUpgradeCommand(UpgradeCommandWritable.convert(cmd))
|
||||
.convert();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
|
||||
rpcProxy.reportBadBlocks(LocatedBlockWritable.convertLocatedBlock(blocks));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitBlockSynchronization(ExtendedBlock block,
|
||||
long newgenerationstamp, long newlength, boolean closeFile,
|
||||
boolean deleteblock, DatanodeID[] newtargets) throws IOException {
|
||||
rpcProxy.commitBlockSynchronization(
|
||||
ExtendedBlockWritable.convertExtendedBlock(block), newgenerationstamp,
|
||||
newlength, closeFile, deleteblock,
|
||||
DatanodeIDWritable.convertDatanodeID(newtargets));
|
||||
}
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeIDWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ExportedBlockKeysWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.StorageInfoWritable;
|
||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.io.WritableFactory;
|
||||
|
||||
/**
|
||||
* DatanodeRegistration class contains all information the name-node needs
|
||||
* to identify and verify a data-node when it contacts the name-node.
|
||||
* This information is sent by data-node with each communication request.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class DatanodeRegistrationWritable implements Writable {
|
||||
static { // register a ctor
|
||||
WritableFactories.setFactory
|
||||
(DatanodeRegistrationWritable.class,
|
||||
new WritableFactory() {
|
||||
public Writable newInstance() { return new DatanodeRegistrationWritable(); }
|
||||
});
|
||||
}
|
||||
|
||||
private DatanodeIDWritable datanodeId;
|
||||
private StorageInfoWritable storageInfo;
|
||||
private ExportedBlockKeysWritable exportedKeys;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
*/
|
||||
public DatanodeRegistrationWritable() {
|
||||
this("", new StorageInfo(), new ExportedBlockKeys());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create DatanodeRegistration
|
||||
*/
|
||||
public DatanodeRegistrationWritable(String nodeName, StorageInfo info,
|
||||
ExportedBlockKeys keys) {
|
||||
this.datanodeId = new DatanodeIDWritable(nodeName);
|
||||
this.storageInfo = StorageInfoWritable.convert(info);
|
||||
this.exportedKeys = ExportedBlockKeysWritable.convert(keys);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////
|
||||
// Writable
|
||||
/////////////////////////////////////////////////
|
||||
/** {@inheritDoc} */
|
||||
public void write(DataOutput out) throws IOException {
|
||||
datanodeId.write(out);
|
||||
|
||||
//TODO: move it to DatanodeID once HADOOP-2797 has been committed
|
||||
out.writeShort(datanodeId.ipcPort);
|
||||
|
||||
storageInfo.write(out);
|
||||
exportedKeys.write(out);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
datanodeId.readFields(in);
|
||||
|
||||
//TODO: move it to DatanodeID once HADOOP-2797 has been committed
|
||||
datanodeId.ipcPort = in.readShort() & 0x0000ffff;
|
||||
|
||||
storageInfo.readFields(in);
|
||||
exportedKeys.readFields(in);
|
||||
}
|
||||
|
||||
public DatanodeRegistration convert() {
|
||||
DatanodeRegistration dnReg = new DatanodeRegistration(datanodeId.name,
|
||||
storageInfo.convert(), exportedKeys.convert());
|
||||
dnReg.setIpcPort(datanodeId.ipcPort);
|
||||
return dnReg;
|
||||
}
|
||||
|
||||
public static DatanodeRegistrationWritable convert(DatanodeRegistration dnReg) {
|
||||
if (dnReg == null) return null;
|
||||
DatanodeRegistrationWritable ret = new DatanodeRegistrationWritable(
|
||||
dnReg.getName(), dnReg.storageInfo, dnReg.exportedKeys);
|
||||
ret.datanodeId.ipcPort = dnReg.ipcPort;
|
||||
return ret;
|
||||
}
|
||||
}
|
@ -0,0 +1,183 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.reflect.Nullable;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeIDWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.LocatedBlockWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.NamespaceInfoWritable;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
|
||||
import org.apache.hadoop.ipc.VersionedProtocol;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
|
||||
/**********************************************************************
|
||||
* Protocol that a DFS datanode uses to communicate with the NameNode.
|
||||
* It's used to upload current load information and block reports.
|
||||
*
|
||||
* The only way a NameNode can communicate with a DataNode is by
|
||||
* returning values from these functions.
|
||||
*
|
||||
**********************************************************************/
|
||||
@KerberosInfo(
|
||||
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
|
||||
clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
|
||||
@InterfaceAudience.Private
|
||||
public interface DatanodeWireProtocol extends VersionedProtocol {
|
||||
/**
|
||||
* The rules for changing this protocol are the same as that for
|
||||
* {@link ClientNamenodeWireProtocol} - see that java file for details.
|
||||
*/
|
||||
public static final long versionID = 28L;
|
||||
|
||||
// error code
|
||||
final static int NOTIFY = 0;
|
||||
final static int DISK_ERROR = 1; // there are still valid volumes on DN
|
||||
final static int INVALID_BLOCK = 2;
|
||||
final static int FATAL_DISK_ERROR = 3; // no valid volumes left on DN
|
||||
|
||||
/**
|
||||
* Determines actions that data node should perform
|
||||
* when receiving a datanode command.
|
||||
*/
|
||||
final static int DNA_UNKNOWN = 0; // unknown action
|
||||
final static int DNA_TRANSFER = 1; // transfer blocks to another datanode
|
||||
final static int DNA_INVALIDATE = 2; // invalidate blocks
|
||||
final static int DNA_SHUTDOWN = 3; // shutdown node
|
||||
final static int DNA_REGISTER = 4; // re-register
|
||||
final static int DNA_FINALIZE = 5; // finalize previous upgrade
|
||||
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
|
||||
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
|
||||
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
|
||||
|
||||
/**
|
||||
* Register Datanode.
|
||||
* @return updated {@link DatanodeRegistrationWritable}, which contains
|
||||
* new storageID if the datanode did not have one and
|
||||
* registration ID for further communication.
|
||||
*/
|
||||
public DatanodeRegistrationWritable registerDatanode(
|
||||
DatanodeRegistrationWritable registration) throws IOException;
|
||||
/**
|
||||
* sendHeartbeat() tells the NameNode that the DataNode is still
|
||||
* alive and well. Includes some status info, too.
|
||||
* It also gives the NameNode a chance to return
|
||||
* an array of "DatanodeCommand" objects.
|
||||
* A DatanodeCommand tells the DataNode to invalidate local block(s),
|
||||
* or to copy them to other DataNodes, etc.
|
||||
* @param registration datanode registration information
|
||||
* @param capacity total storage capacity available at the datanode
|
||||
* @param dfsUsed storage used by HDFS
|
||||
* @param remaining remaining storage available for HDFS
|
||||
* @param blockPoolUsed storage used by the block pool
|
||||
* @param xmitsInProgress number of transfers from this datanode to others
|
||||
* @param xceiverCount number of active transceiver threads
|
||||
* @param failedVolumes number of failed volumes
|
||||
* @throws IOException on error
|
||||
*/
|
||||
@Nullable
|
||||
public DatanodeCommandWritable[] sendHeartbeat(
|
||||
DatanodeRegistrationWritable registration, long capacity, long dfsUsed,
|
||||
long remaining, long blockPoolUsed, int xmitsInProgress,
|
||||
int xceiverCount, int failedVolumes) throws IOException;
|
||||
|
||||
/**
|
||||
* blockReport() tells the NameNode about all the locally-stored blocks.
|
||||
* The NameNode returns an array of Blocks that have become obsolete
|
||||
* and should be deleted. This function is meant to upload *all*
|
||||
* the locally-stored blocks. It's invoked upon startup and then
|
||||
* infrequently afterwards.
|
||||
* @param registration
|
||||
* @param poolId - the block pool ID for the blocks
|
||||
* @param blocks - the block list as an array of longs.
|
||||
* Each block is represented as 2 longs.
|
||||
* This is done instead of Block[] to reduce memory used by block reports.
|
||||
*
|
||||
* @return - the next command for DN to process.
|
||||
* @throws IOException
|
||||
*/
|
||||
public DatanodeCommandWritable blockReport(
|
||||
DatanodeRegistrationWritable registration, String poolId, long[] blocks)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
|
||||
* recently-received and -deleted block data.
|
||||
*
|
||||
* For the case of received blocks, a hint for preferred replica to be
|
||||
* deleted when there is any excessive blocks is provided.
|
||||
* For example, whenever client code
|
||||
* writes a new Block here, or another DataNode copies a Block to
|
||||
* this DataNode, it will call blockReceived().
|
||||
*/
|
||||
public void blockReceivedAndDeleted(
|
||||
DatanodeRegistrationWritable registration, String poolId,
|
||||
ReceivedDeletedBlockInfoWritable[] receivedAndDeletedBlocks)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* errorReport() tells the NameNode about something that has gone
|
||||
* awry. Useful for debugging.
|
||||
*/
|
||||
public void errorReport(DatanodeRegistrationWritable registration,
|
||||
int errorCode, String msg) throws IOException;
|
||||
|
||||
public NamespaceInfoWritable versionRequest() throws IOException;
|
||||
|
||||
/**
|
||||
* This is a very general way to send a command to the name-node during
|
||||
* distributed upgrade process.
|
||||
*
|
||||
* The generosity is because the variety of upgrade commands is unpredictable.
|
||||
* The reply from the name-node is also received in the form of an upgrade
|
||||
* command.
|
||||
*
|
||||
* @return a reply in the form of an upgrade command
|
||||
*/
|
||||
UpgradeCommandWritable processUpgradeCommand(UpgradeCommandWritable comm)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* same as {@link ClientProtocol#reportBadBlocks(LocatedBlock[])}
|
||||
* }
|
||||
*/
|
||||
public void reportBadBlocks(LocatedBlockWritable[] blocks) throws IOException;
|
||||
|
||||
/**
|
||||
* Commit block synchronization in lease recovery
|
||||
*/
|
||||
public void commitBlockSynchronization(ExtendedBlockWritable block,
|
||||
long newgenerationstamp, long newlength, boolean closeFile,
|
||||
boolean deleteblock, DatanodeIDWritable[] newtargets) throws IOException;
|
||||
|
||||
/**
|
||||
* This method is defined to get the protocol signature using
|
||||
* the R23 protocol - hence we have added the suffix of 2 the method name
|
||||
* to avoid conflict.
|
||||
*/
|
||||
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
|
||||
long clientVersion, int clientMethodsHash) throws IOException;
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.io.WritableFactory;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
/**
|
||||
* A FinalizeCommand is an instruction from namenode to finalize the previous
|
||||
* upgrade to a datanode
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class FinalizeCommandWritable extends DatanodeCommandWritable {
|
||||
// /////////////////////////////////////////
|
||||
// Writable
|
||||
// /////////////////////////////////////////
|
||||
static { // register a ctor
|
||||
WritableFactories.setFactory(FinalizeCommandWritable.class,
|
||||
new WritableFactory() {
|
||||
public Writable newInstance() {
|
||||
return new FinalizeCommandWritable();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
String blockPoolId;
|
||||
|
||||
private FinalizeCommandWritable() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
public FinalizeCommandWritable(String bpid) {
|
||||
super(DatanodeWireProtocol.DNA_FINALIZE);
|
||||
blockPoolId = bpid;
|
||||
}
|
||||
|
||||
public String getBlockPoolId() {
|
||||
return blockPoolId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
blockPoolId = WritableUtils.readString(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
WritableUtils.writeString(out, blockPoolId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommand convert() {
|
||||
return new FinalizeCommand(blockPoolId);
|
||||
}
|
||||
|
||||
public static FinalizeCommandWritable convert(FinalizeCommand cmd) {
|
||||
if (cmd == null) {
|
||||
return null;
|
||||
}
|
||||
return new FinalizeCommandWritable(cmd.getBlockPoolId());
|
||||
}
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.ExportedBlockKeysWritable;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.io.WritableFactory;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class KeyUpdateCommandWritable extends DatanodeCommandWritable {
|
||||
private ExportedBlockKeysWritable keys;
|
||||
|
||||
KeyUpdateCommandWritable() {
|
||||
this(new ExportedBlockKeysWritable());
|
||||
}
|
||||
|
||||
public KeyUpdateCommandWritable(ExportedBlockKeysWritable keys) {
|
||||
super(DatanodeWireProtocol.DNA_ACCESSKEYUPDATE);
|
||||
this.keys = keys;
|
||||
}
|
||||
|
||||
public ExportedBlockKeysWritable getExportedKeys() {
|
||||
return this.keys;
|
||||
}
|
||||
|
||||
// ///////////////////////////////////////////////
|
||||
// Writable
|
||||
// ///////////////////////////////////////////////
|
||||
static { // register a ctor
|
||||
WritableFactories.setFactory(KeyUpdateCommandWritable.class,
|
||||
new WritableFactory() {
|
||||
public Writable newInstance() {
|
||||
return new KeyUpdateCommandWritable();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
keys.write(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
keys.readFields(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommand convert() {
|
||||
return new KeyUpdateCommand(keys.convert());
|
||||
}
|
||||
|
||||
public static KeyUpdateCommandWritable convert(KeyUpdateCommand cmd) {
|
||||
if (cmd == null) {
|
||||
return null;
|
||||
}
|
||||
return new KeyUpdateCommandWritable(ExportedBlockKeysWritable.convert(cmd
|
||||
.getExportedKeys()));
|
||||
}
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocolR23Compatible.BlockWritable;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* A data structure to store Block and delHints together, used to send
|
||||
* received/deleted ACKs.
|
||||
*/
|
||||
public class ReceivedDeletedBlockInfoWritable implements Writable {
|
||||
BlockWritable block;
|
||||
String delHints;
|
||||
|
||||
public final static String TODELETE_HINT = "-";
|
||||
|
||||
public ReceivedDeletedBlockInfoWritable() {
|
||||
}
|
||||
|
||||
public ReceivedDeletedBlockInfoWritable(BlockWritable blk, String delHints) {
|
||||
this.block = blk;
|
||||
this.delHints = delHints;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
this.block.write(out);
|
||||
Text.writeString(out, this.delHints);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.block = new BlockWritable();
|
||||
this.block.readFields(in);
|
||||
this.delHints = Text.readString(in);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return block.toString() + ", delHint: " + delHints;
|
||||
}
|
||||
|
||||
public static ReceivedDeletedBlockInfo[] convert(
|
||||
ReceivedDeletedBlockInfoWritable[] rdBlocks) {
|
||||
ReceivedDeletedBlockInfo[] ret =
|
||||
new ReceivedDeletedBlockInfo[rdBlocks.length];
|
||||
for (int i = 0; i < rdBlocks.length; i++) {
|
||||
ret[i] = rdBlocks[i].convert();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public static ReceivedDeletedBlockInfoWritable[] convert(
|
||||
ReceivedDeletedBlockInfo[] blocks) {
|
||||
ReceivedDeletedBlockInfoWritable[] ret =
|
||||
new ReceivedDeletedBlockInfoWritable[blocks.length];
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
ret[i] = convert(blocks[i]);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public ReceivedDeletedBlockInfo convert() {
|
||||
return new ReceivedDeletedBlockInfo(block.convert(), delHints);
|
||||
}
|
||||
|
||||
public static ReceivedDeletedBlockInfoWritable convert(
|
||||
ReceivedDeletedBlockInfo b) {
|
||||
if (b == null) return null;
|
||||
return new ReceivedDeletedBlockInfoWritable(BlockWritable.convert(b
|
||||
.getBlock()), b.getDelHints());
|
||||
}
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.io.WritableFactory;
|
||||
|
||||
/**
|
||||
* A RegisterCommand is an instruction to a datanode to register with the
|
||||
* namenode.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class RegisterCommandWritable extends DatanodeCommandWritable {
|
||||
public static final RegisterCommandWritable REGISTER =
|
||||
new RegisterCommandWritable();
|
||||
|
||||
// /////////////////////////////////////////
|
||||
// Writable
|
||||
// /////////////////////////////////////////
|
||||
static { // register a ctor
|
||||
WritableFactories.setFactory(RegisterCommandWritable.class,
|
||||
new WritableFactory() {
|
||||
public Writable newInstance() {
|
||||
return new RegisterCommandWritable();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RegisterCommandWritable() {
|
||||
super(DatanodeWireProtocol.DNA_REGISTER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) { /* Nothing to read */
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) { /* Nothing to write */
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeCommand convert() {
|
||||
return RegisterCommand.REGISTER;
|
||||
}
|
||||
}
|
@ -0,0 +1,75 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Base class for a server command.
|
||||
* Issued by the name-node to notify other servers what should be done.
|
||||
* Commands are defined by actions defined in respective protocols.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class ServerCommandWritable implements Writable {
|
||||
private int action;
|
||||
|
||||
/**
|
||||
* Unknown server command constructor.
|
||||
* Creates a command with action 0.
|
||||
*/
|
||||
public ServerCommandWritable() {
|
||||
this(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a command for the specified action.
|
||||
* Actions are protocol specific.
|
||||
* @param action
|
||||
*/
|
||||
public ServerCommandWritable(int action) {
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get server command action.
|
||||
* @return action code.
|
||||
*/
|
||||
public int getAction() {
|
||||
return this.action;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////
|
||||
// Writable
|
||||
///////////////////////////////////////////
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeInt(this.action);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.action = in.readInt();
|
||||
}
|
||||
}
|
@ -0,0 +1,106 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.protocolR23Compatible;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.io.WritableFactory;
|
||||
|
||||
/**
|
||||
* This as a generic distributed upgrade command.
|
||||
*
|
||||
* During the upgrade cluster components send upgrade commands to each other
|
||||
* in order to obtain or share information with them.
|
||||
* It is supposed that each upgrade defines specific upgrade command by
|
||||
* deriving them from this class.
|
||||
* The upgrade command contains version of the upgrade, which is verified
|
||||
* on the receiving side and current status of the upgrade.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class UpgradeCommandWritable extends DatanodeCommandWritable {
|
||||
final static int UC_ACTION_UNKNOWN = DatanodeWireProtocol.DNA_UNKNOWN;
|
||||
public final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status
|
||||
public final static int UC_ACTION_START_UPGRADE = 101; // start upgrade
|
||||
|
||||
private int version;
|
||||
private short upgradeStatus;
|
||||
|
||||
public UpgradeCommandWritable() {
|
||||
super(UC_ACTION_UNKNOWN);
|
||||
this.version = 0;
|
||||
this.upgradeStatus = 0;
|
||||
}
|
||||
|
||||
public UpgradeCommandWritable(int action, int version, short status) {
|
||||
super(action);
|
||||
this.version = version;
|
||||
this.upgradeStatus = status;
|
||||
}
|
||||
|
||||
public int getVersion() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
public short getCurrentStatus() {
|
||||
return this.upgradeStatus;
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////
|
||||
// Writable
|
||||
/////////////////////////////////////////////////
|
||||
static { // register a ctor
|
||||
WritableFactories.setFactory
|
||||
(UpgradeCommandWritable.class,
|
||||
new WritableFactory() {
|
||||
public Writable newInstance() { return new UpgradeCommandWritable(); }
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
out.writeInt(this.version);
|
||||
out.writeShort(this.upgradeStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
this.version = in.readInt();
|
||||
this.upgradeStatus = in.readShort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpgradeCommand convert() {
|
||||
return new UpgradeCommand(getAction(), version, upgradeStatus);
|
||||
}
|
||||
|
||||
public static UpgradeCommandWritable convert(UpgradeCommand cmd) {
|
||||
if (cmd == null) return null;
|
||||
return new UpgradeCommandWritable(cmd.getAction(), cmd.getVersion(),
|
||||
cmd.getCurrentStatus());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user