diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index df434f4826..df85179ccc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -62,6 +62,9 @@ Trunk (unreleased changes) HDFS-3178. Add states and state handler for journal synchronization in JournalService. (szetszwo) + HDFS-3204. Minor modification to JournalProtocol.proto to make + it generic. (suresh) + OPTIMIZATIONS HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java index 0e8b534831..1858e70980 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java @@ -48,7 +48,7 @@ public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) { public JournalResponseProto journal(RpcController unused, JournalRequestProto req) throws ServiceException { try { - impl.journal(PBHelper.convert(req.getRegistration()), + impl.journal(PBHelper.convert(req.getJournalInfo()), req.getFirstTxnId(), req.getNumTxns(), req.getRecords() .toByteArray()); } catch (IOException e) { @@ -62,7 +62,7 @@ public JournalResponseProto journal(RpcController unused, public StartLogSegmentResponseProto startLogSegment(RpcController controller, StartLogSegmentRequestProto req) throws ServiceException { try { - impl.startLogSegment(PBHelper.convert(req.getRegistration()), + impl.startLogSegment(PBHelper.convert(req.getJournalInfo()), req.getTxid()); } catch (IOException e) { throw new ServiceException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java index 76ca46f4e4..9258180e52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java @@ -24,12 +24,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; -import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolMetaInterface; -import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; @@ -63,7 +61,7 @@ public void close() { public void journal(NamenodeRegistration reg, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() - .setRegistration(PBHelper.convert(reg)) + .setJournalInfo(PBHelper.convertToJournalInfo(reg)) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelper.getByteString(records)) @@ -79,7 +77,7 @@ public void journal(NamenodeRegistration reg, long firstTxnId, public void startLogSegment(NamenodeRegistration registration, long txid) throws IOException { StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() - .setRegistration(PBHelper.convert(registration)) + .setJournalInfo(PBHelper.convertToJournalInfo(registration)) .setTxid(txid) .build(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index d21a86d92f..e084862e82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -127,7 +128,6 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; -import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; @@ -1346,4 +1346,26 @@ public static StorageReportProto convert(StorageReport r) { .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining()) .setStorageID(r.getStorageID()).build(); } + + public static NamenodeRegistration convert(JournalInfoProto info) { + int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; + int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; + StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0); + + // Note that the role is always {@link NamenodeRole#NAMENODE} as this + // conversion happens for messages from Namenode to Journal receivers. + // Addresses in the registration are unused. + return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE); + } + + /** + * Method used for converting {@link JournalInfoProto} sent from Namenode + * to Journal receivers to {@link NamenodeRegistration}. + */ + public static JournalInfoProto convertToJournalInfo( + NamenodeRegistration reg) { + return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID()) + .setLayoutVersion(reg.getLayoutVersion()) + .setNamespaceID(reg.getNamespaceID()).build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java index ece013fa55..3bf5d66640 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java @@ -331,8 +331,7 @@ private boolean tryConvergeJournalSpool() throws IOException { */ private synchronized void setState(BNState newState) { if (LOG.isDebugEnabled()) { - LOG.debug("State transition " + bnState + " -> " + newState, - new Exception("trace")); + LOG.debug("State transition " + bnState + " -> " + newState); } bnState = newState; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index 706696fd66..ee08793eaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -27,17 +27,15 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; @@ -225,8 +223,6 @@ public boolean setSafeMode(SafeModeAction action) throws IOException { static class BackupNodeRpcServer extends NameNodeRpcServer implements JournalProtocol { - private final String nnRpcAddress; - private BackupNodeRpcServer(Configuration conf, BackupNode nn) throws IOException { super(conf, nn); @@ -236,8 +232,31 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn) .newReflectiveBlockingService(journalProtocolTranslator); DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service, this.clientRpcServer); - nnRpcAddress = nn.nnRpcAddress; } + + /** + * Verifies a journal request + * @param nodeReg node registration + * @throws UnregisteredNodeException if the registration is invalid + */ + void verifyJournalRequest(NamenodeRegistration reg) throws IOException { + verifyVersion(reg.getLayoutVersion()); + String errorMsg = null; + int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID(); + if (reg.getNamespaceID() != expectedNamespaceID) { + errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID + + " actual " + reg.getNamespaceID(); + LOG.warn(errorMsg); + throw new UnregisteredNodeException(reg); + } + if (!reg.getClusterID().equals(namesystem.getClusterId())) { + errorMsg = "Invalid clusterId in journal request - expected " + + reg.getClusterID() + " actual " + namesystem.getClusterId(); + LOG.warn(errorMsg); + throw new UnregisteredNodeException(reg); + } + } + ///////////////////////////////////////////////////// // BackupNodeProtocol implementation for backup node. @@ -246,8 +265,7 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn) public void startLogSegment(NamenodeRegistration registration, long txid) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); - verifyRequest(registration); - + verifyJournalRequest(registration); getBNImage().namenodeStartedLogSegment(txid); } @@ -256,10 +274,7 @@ public void journal(NamenodeRegistration nnReg, long firstTxId, int numTxns, byte[] records) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); - verifyRequest(nnReg); - if(!nnRpcAddress.equals(nnReg.getAddress())) - throw new IOException("Journal request from unexpected name-node: " - + nnReg.getAddress() + " expecting " + nnRpcAddress); + verifyJournalRequest(nnReg); getBNImage().journal(firstTxId, numTxns, records); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto index fa7ed15c64..c15347190e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto @@ -27,16 +27,25 @@ option java_generate_equals_and_hash = true; import "hdfs.proto"; /** - * registration - the registration info of the active NameNode - * firstTxnId - the first txid in the rolled edit log + * Journal information used by the journal receiver to identify a journal. + */ +message JournalInfoProto { + required string clusterID = 1; // ID of the cluster + optional uint32 layoutVersion = 2; // Layout version + optional uint32 namespaceID = 3; // Namespace ID +} + +/** + * JournalInfo - the information about the journal + * firstTxnId - the first txid in the journal records * numTxns - Number of transactions in editlog * records - bytes containing serialized journal records */ message JournalRequestProto { - required NamenodeRegistrationProto registration = 1; // Registration info - required uint64 firstTxnId = 2; // Transaction ID - required uint32 numTxns = 3; // Transaction ID - required bytes records = 4; // Journal record + required JournalInfoProto journalInfo = 1; + required uint64 firstTxnId = 2; + required uint32 numTxns = 3; + required bytes records = 4; } /** @@ -46,12 +55,12 @@ message JournalResponseProto { } /** - * registration - the registration info of the active NameNode + * JournalInfo - the information about the journal * txid - first txid in the new log */ message StartLogSegmentRequestProto { - required NamenodeRegistrationProto registration = 1; // Registration info - required uint64 txid = 2; // Transaction ID + required JournalInfoProto journalInfo = 1; + required uint64 txid = 2; } /**