diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index af5af3e648..672b5e7a0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -78,6 +78,8 @@ Trunk (unreleased changes) HDFS-2489. Move Finalize and Register to separate file out of DatanodeCommand.java. (suresh) + HDFS-2488. Separate datatypes for InterDatanodeProtocol. (suresh) + BUG FIXES HDFS-2287. TestParallelRead has a small off-by-one bug. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/BlockWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/BlockWritable.java index e218f7ab50..7518abaea9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/BlockWritable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/BlockWritable.java @@ -42,6 +42,7 @@ public class BlockWritable implements Writable { }); } + private long blockId; private long numBytes; private long generationStamp; @@ -79,4 +80,16 @@ public static BlockWritable convert(Block b) { public Block convert() { return new Block(blockId, numBytes, generationStamp); } + + public long getBlockId() { + return blockId; + } + + public long getNumBytes() { + return numBytes; + } + + public long getGenerationStamp() { + return generationStamp; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 61c1cbec18..fea3928e67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -158,6 +158,9 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; +import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23; +import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23; +import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpServer; @@ -589,7 +592,11 @@ private void initIpcServer(Configuration conf) throws IOException { conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT), false, conf, blockPoolTokenSecretManager); - ipcServer.addProtocol(InterDatanodeProtocol.class, this); + InterDatanodeProtocolServerSideTranslatorR23 + interDatanodeProtocolServerTranslator = + new InterDatanodeProtocolServerSideTranslatorR23(this); + ipcServer.addProtocol(InterDatanodeWireProtocol.class, + interDatanodeProtocolServerTranslator); // set service-level authorization security policy if (conf.getBoolean( @@ -1641,15 +1648,13 @@ public static InterDatanodeProtocol createInterDataNodeProtocolProxy( if (InterDatanodeProtocol.LOG.isDebugEnabled()) { InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr); } - UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); + final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); try { return loginUgi .doAs(new PrivilegedExceptionAction() { public InterDatanodeProtocol run() throws IOException { - return (InterDatanodeProtocol) RPC.getProxy( - InterDatanodeProtocol.class, InterDatanodeProtocol.versionID, - addr, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), socketTimeout); + return new InterDatanodeProtocolTranslatorR23(addr, loginUgi, + conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout); } }); } catch (InterruptedException ie) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index 5ecdca7b79..117663cd86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.util.MBeans; @@ -1263,8 +1262,8 @@ public synchronized File getBlockFile(String bpid, Block b) throws IOException { File f = validateBlockFile(bpid, b); if(f == null) { - if (InterDatanodeProtocol.LOG.isDebugEnabled()) { - InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap); + if (DataNode.LOG.isDebugEnabled()) { + DataNode.LOG.debug("b=" + b + ", volumeMap=" + volumeMap); } throw new IOException("Block " + b + " is not valid."); } @@ -2003,8 +2002,8 @@ File validateBlockFile(String bpid, Block b) throws IOException { datanode.checkDiskError(); } - if (InterDatanodeProtocol.LOG.isDebugEnabled()) { - InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f); + if (DataNode.LOG.isDebugEnabled()) { + DataNode.LOG.debug("b=" + b + ", f=" + f); } return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java index 1c62a60595..7d4808807e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java @@ -25,7 +25,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo; @@ -39,6 +41,23 @@ public interface InterDatanodeProtocol extends VersionedProtocol { public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class); /** + * Until version 9, this class InterDatanodeProtocol served as both + * the interface to the DN AND the RPC protocol used to communicate with the + * DN. + * + * Post version 6L (release 23 of Hadoop), the protocol is implemented in + * {@literal ../protocolR23Compatible/InterDatanodeWireProtocol} + * + * This class is used by both the DN to insulate from the protocol + * serialization. + * + * If you are adding/changing DN's interface then you need to + * change both this class and ALSO + * {@link InterDatanodeWireProtocol} + * These changes need to be done in a compatible fashion as described in + * {@link ClientNamenodeWireProtocol} + * + * The log of historical changes can be retrieved from the svn). * 6: Add block pool ID to Block */ public static final long versionID = 6L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeProtocolServerSideTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeProtocolServerSideTranslatorR23.java new file mode 100644 index 0000000000..9b6c63b396 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeProtocolServerSideTranslatorR23.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +/** + * This class is used on the server side. Calls come across the wire for the + * protocol family of Release 23 onwards. This class translates the R23 data + * types to the internal data types used inside the DN as specified in the + * generic InterDatanodeProtocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class InterDatanodeProtocolServerSideTranslatorR23 implements + InterDatanodeWireProtocol { + final private InterDatanodeProtocol server; + + /** + * + * @param server - datanode server + * @throws IOException + */ + public InterDatanodeProtocolServerSideTranslatorR23( + InterDatanodeProtocol server) throws IOException { + this.server = server; + } + + /** + * the client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client's call will never reach here. + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link InterDatanodeProtocol} + */ + if (!protocol.equals(RPC.getProtocolName(InterDatanodeWireProtocol.class))) { + throw new IOException("Datanode Serverside implements " + + InterDatanodeWireProtocol.class + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + InterDatanodeWireProtocol.versionID, InterDatanodeWireProtocol.class); + } + + @Override + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientNamenodeProtocol} + */ + return ProtocolSignatureWritable.convert(this.getProtocolSignature( + protocol, clientVersion, clientMethodsHash)); + + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + if (protocol.equals(RPC.getProtocolName(InterDatanodeWireProtocol.class))) { + return InterDatanodeWireProtocol.versionID; + } + throw new IOException("Datanode Serverside implements " + + InterDatanodeWireProtocol.class + + ". The following requested protocol is unknown: " + protocol); + } + + @Override + public ReplicaRecoveryInfoWritable initReplicaRecovery( + RecoveringBlockWritable rBlock) throws IOException { + return ReplicaRecoveryInfoWritable.convert(server + .initReplicaRecovery(rBlock.convert())); + } + + @Override + public ExtendedBlockWritable updateReplicaUnderRecovery( + ExtendedBlockWritable oldBlock, long recoveryId, long newLength) + throws IOException { + ExtendedBlock b = ExtendedBlockWritable.convertExtendedBlock(oldBlock); + return ExtendedBlockWritable.convertExtendedBlock(server + .updateReplicaUnderRecovery(b, recoveryId, newLength)); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeProtocolTranslatorR23.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeProtocolTranslatorR23.java new file mode 100644 index 0000000000..730ec1568d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeProtocolTranslatorR23.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import javax.net.SocketFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * This class forwards InterDatanodeProtocol calls as RPC to the DN server while + * translating from the parameter types used in InterDatanodeProtocol to those + * used in protocolR23Compatile.*. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class InterDatanodeProtocolTranslatorR23 implements + InterDatanodeProtocol { + + final private InterDatanodeWireProtocol rpcProxy; + + /** used for testing */ + public InterDatanodeProtocolTranslatorR23(InetSocketAddress addr, + UserGroupInformation ugi, Configuration conf, SocketFactory factory, + int socketTimeout) + throws IOException { + rpcProxy = createInterDatanodeProtocolProxy(addr, ugi, conf, factory, + socketTimeout); + } + + static InterDatanodeWireProtocol createInterDatanodeProtocolProxy( + InetSocketAddress addr, UserGroupInformation ugi, Configuration conf, + SocketFactory factory, int socketTimeout) throws IOException { + return RPC.getProxy(InterDatanodeWireProtocol.class, + InterDatanodeWireProtocol.versionID, addr, ugi, conf, factory, + socketTimeout); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocolName, + long clientVersion, int clientMethodHash) throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocolName, clientVersion, clientMethodHash)); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return rpcProxy.getProtocolVersion(protocolName, clientVersion); + } + + @Override + public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) + throws IOException { + return rpcProxy + .initReplicaRecovery(RecoveringBlockWritable.convert(rBlock)).convert(); + } + + @Override + public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, + long recoveryId, long newLength) throws IOException { + ExtendedBlockWritable eb = ExtendedBlockWritable + .convertExtendedBlock(oldBlock); + ExtendedBlockWritable b = rpcProxy.updateReplicaUnderRecovery(eb, + recoveryId, newLength); + return ExtendedBlockWritable.convertExtendedBlock(b); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeWireProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeWireProtocol.java new file mode 100644 index 0000000000..40ad845f9b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/InterDatanodeWireProtocol.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; + +/** An inter-datanode protocol for updating generation stamp + */ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, + clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) +@InterfaceAudience.Private +public interface InterDatanodeWireProtocol extends VersionedProtocol { + public static final Log LOG = + LogFactory.getLog(InterDatanodeWireProtocol.class); + /** + * The rules for changing this protocol are the same as that for + * {@link ClientNamenodeWireProtocol} - see that java file for details. + * 6: Add block pool ID to Block + */ + public static final long versionID = 6L; + + /** + * Initialize a replica recovery. + * + * @return actual state of the replica on this data-node or + * null if data-node does not have the replica. + */ + ReplicaRecoveryInfoWritable initReplicaRecovery(RecoveringBlockWritable rBlock) + throws IOException; + + /** + * Update replica with the new generation stamp and length. + */ + ExtendedBlockWritable updateReplicaUnderRecovery( + ExtendedBlockWritable oldBlock, long recoveryId, long newLength) + throws IOException; + + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 to the method name + * to avoid conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2( + String protocol, long clientVersion, int clientMethodsHash) + throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/RecoveringBlockWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/RecoveringBlockWritable.java new file mode 100644 index 0000000000..0324f265c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/RecoveringBlockWritable.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocolR23Compatible.DatanodeInfoWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable; +import org.apache.hadoop.hdfs.protocolR23Compatible.LocatedBlockWritable; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * This is a block with locations from which it should be recovered and the new + * generation stamp, which the block will have after successful recovery. + * + * The new generation stamp of the block, also plays role of the recovery id. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RecoveringBlockWritable implements Writable { + private long newGenerationStamp; + private LocatedBlockWritable locatedBlock; + + /** + * Create empty RecoveringBlock. + */ + public RecoveringBlockWritable() { + locatedBlock = new LocatedBlockWritable(); + newGenerationStamp = -1L; + } + + /** + * Create RecoveringBlock. + */ + public RecoveringBlockWritable(ExtendedBlockWritable b, + DatanodeInfoWritable[] locs, long newGS) { + locatedBlock = new LocatedBlockWritable(b, locs, -1, false); + this.newGenerationStamp = newGS; + } + + // ///////////////////////////////////////// + // Writable + // ///////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(RecoveringBlockWritable.class, + new WritableFactory() { + public Writable newInstance() { + return new RecoveringBlockWritable(); + } + }); + } + + public void write(DataOutput out) throws IOException { + locatedBlock.write(out); + out.writeLong(newGenerationStamp); + } + + public void readFields(DataInput in) throws IOException { + locatedBlock = new LocatedBlockWritable(); + locatedBlock.readFields(in); + newGenerationStamp = in.readLong(); + } + + public RecoveringBlock convert() { + ExtendedBlockWritable eb = locatedBlock.getBlock(); + DatanodeInfoWritable[] dnInfo = locatedBlock.getLocations(); + return new RecoveringBlock(ExtendedBlockWritable.convertExtendedBlock(eb), + DatanodeInfoWritable.convertDatanodeInfo(dnInfo), newGenerationStamp); + } + + public static RecoveringBlockWritable convert(RecoveringBlock rBlock) { + if (rBlock == null) { + return null; + } + ExtendedBlockWritable eb = ExtendedBlockWritable + .convertExtendedBlock(rBlock.getBlock()); + DatanodeInfoWritable[] dnInfo = DatanodeInfoWritable + .convertDatanodeInfo(rBlock.getLocations()); + return new RecoveringBlockWritable(eb, dnInfo, + rBlock.getNewGenerationStamp()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReplicaRecoveryInfoWritable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReplicaRecoveryInfoWritable.java new file mode 100644 index 0000000000..e6853600e4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocolR23Compatible/ReplicaRecoveryInfoWritable.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocolR23Compatible.BlockWritable; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +/** + * Replica recovery information. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ReplicaRecoveryInfoWritable implements Writable { + private int originalState; + private BlockWritable block; + + public ReplicaRecoveryInfoWritable() { + } + + public ReplicaRecoveryInfoWritable(long blockId, long diskLen, long gs, + ReplicaState rState) { + block = new BlockWritable(blockId, diskLen, gs); + originalState = rState.getValue(); + } + + // ///////////////////////////////////////// + // Writable + // ///////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory(ReplicaRecoveryInfoWritable.class, + new WritableFactory() { + public Writable newInstance() { + return new ReplicaRecoveryInfoWritable(); + } + }); + } + + @Override + public void readFields(DataInput in) throws IOException { + block = new BlockWritable(); + block.readFields(in); + originalState = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + block.write(out); + out.writeInt(originalState); + } + + public static ReplicaRecoveryInfoWritable convert(ReplicaRecoveryInfo rrInfo) { + return new ReplicaRecoveryInfoWritable(rrInfo.getBlockId(), + rrInfo.getNumBytes(), rrInfo.getGenerationStamp(), + rrInfo.getOriginalReplicaState()); + } + + public ReplicaRecoveryInfo convert() { + return new ReplicaRecoveryInfo(block.getBlockId(), block.getNumBytes(), + block.getGenerationStamp(), ReplicaState.getState(originalState)); + } +}