From 74b4b45651037fc8d2a97d3b2de51334dcbf759c Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Sat, 7 Apr 2012 00:52:12 +0000 Subject: [PATCH] HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo and epoch in JournalProtocol. Contributed by suresh git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1310649 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 15 ++-- .../protocol/UnregisteredNodeException.java | 5 ++ ...JournalProtocolServerSideTranslatorPB.java | 24 ++++- .../JournalProtocolTranslatorPB.java | 29 ++++-- .../hadoop/hdfs/protocolPB/PBHelper.java | 21 ++--- .../server/journalservice/JournalService.java | 88 ++++++++++++++++--- .../server/namenode/BackupJournalManager.java | 10 ++- .../hdfs/server/namenode/BackupNode.java | 46 ++++++---- .../namenode/EditLogBackupOutputStream.java | 18 ++-- .../hdfs/server/protocol/FenceResponse.java | 48 ++++++++++ .../hdfs/server/protocol/FencedException.java | 32 +++++++ .../hdfs/server/protocol/JournalInfo.java | 48 ++++++++++ .../hdfs/server/protocol/JournalProtocol.java | 27 ++++-- .../src/main/proto/JournalProtocol.proto | 38 +++++++- .../journalservice/TestJournalService.java | 33 ++++++- 15 files changed, 400 insertions(+), 82 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index dc3a6bbef0..115a855139 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -62,9 +62,6 @@ Trunk (unreleased changes) HDFS-3178. Add states and state handler for journal synchronization in JournalService. (szetszwo) - HDFS-3204. Minor modification to JournalProtocol.proto to make - it generic. (suresh) - OPTIMIZATIONS HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. @@ -335,6 +332,15 @@ Release 2.0.0 - UNRELEASED HDFS-3226. Allow GetConf tool to print arbitrary keys (todd) + HDFS-3204. Minor modification to JournalProtocol.proto to make + it generic. (suresh) + + HDFS-2505. Add a test to verify getFileChecksum(..) with ViewFS. (Ravi + Prakash via szetszwo) + + HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo + and epoch in JournalProtocol. (suresh via szetszwo) + OPTIMIZATIONS HDFS-3024. Improve performance of stringification in addStoredBlock (todd) @@ -771,9 +777,6 @@ Release 0.23.3 - UNRELEASED IMPROVEMENTS - HDFS-2505. Add a test to verify getFileChecksum(..) with ViewFS. (Ravi - Prakash via szetszwo) - OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java index 941a320a79..eabdd22a97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; /** @@ -33,6 +34,10 @@ public class UnregisteredNodeException extends IOException { private static final long serialVersionUID = -5620209396945970810L; + public UnregisteredNodeException(JournalInfo info) { + super("Unregistered server: " + info.toString()); + } + public UnregisteredNodeException(NodeRegistration nodeReg) { super("Unregistered server: " + nodeReg.toString()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java index 1858e70980..1805d14664 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java @@ -20,10 +20,13 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import com.google.protobuf.RpcController; @@ -48,9 +51,8 @@ public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) { public JournalResponseProto journal(RpcController unused, JournalRequestProto req) throws ServiceException { try { - impl.journal(PBHelper.convert(req.getJournalInfo()), - req.getFirstTxnId(), req.getNumTxns(), req.getRecords() - .toByteArray()); + impl.journal(PBHelper.convert(req.getJournalInfo()), req.getEpoch(), + req.getFirstTxnId(), req.getNumTxns(), req.getRecords().toByteArray()); } catch (IOException e) { throw new ServiceException(e); } @@ -63,10 +65,24 @@ public StartLogSegmentResponseProto startLogSegment(RpcController controller, StartLogSegmentRequestProto req) throws ServiceException { try { impl.startLogSegment(PBHelper.convert(req.getJournalInfo()), - req.getTxid()); + req.getEpoch(), req.getTxid()); } catch (IOException e) { throw new ServiceException(e); } return StartLogSegmentResponseProto.newBuilder().build(); } + + @Override + public FenceResponseProto fence(RpcController controller, + FenceRequestProto req) throws ServiceException { + try { + FenceResponse resp = impl.fence(PBHelper.convert(req.getJournalInfo()), req.getEpoch(), + req.getFencerInfo()); + return FenceResponseProto.newBuilder().setInSync(resp.isInSync()) + .setLastTransactionId(resp.getLastTransactionId()) + .setPreviousEpoch(resp.getPreviousEpoch()).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java index 9258180e52..d14e4e22fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java @@ -22,10 +22,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; @@ -58,10 +61,11 @@ public void close() { } @Override - public void journal(NamenodeRegistration reg, long firstTxnId, + public void journal(JournalInfo journalInfo, long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() - .setJournalInfo(PBHelper.convertToJournalInfo(reg)) + .setJournalInfo(PBHelper.convert(journalInfo)) + .setEpoch(epoch) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelper.getByteString(records)) @@ -74,10 +78,11 @@ public void journal(NamenodeRegistration reg, long firstTxnId, } @Override - public void startLogSegment(NamenodeRegistration registration, long txid) + public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException { StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() - .setJournalInfo(PBHelper.convertToJournalInfo(registration)) + .setJournalInfo(PBHelper.convert(journalInfo)) + .setEpoch(epoch) .setTxid(txid) .build(); try { @@ -86,6 +91,20 @@ public void startLogSegment(NamenodeRegistration registration, long txid) throw ProtobufHelper.getRemoteException(e); } } + + @Override + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException { + FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch) + .setJournalInfo(PBHelper.convert(journalInfo)).build(); + try { + FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req); + return new FenceResponse(resp.getPreviousEpoch(), + resp.getLastTransactionId(), resp.getInSync()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } @Override public boolean isMethodSupported(String methodName) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index e084862e82..fc50606f4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -117,6 +118,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -127,7 +129,6 @@ import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; @@ -1347,25 +1348,19 @@ public static StorageReportProto convert(StorageReport r) { .setStorageID(r.getStorageID()).build(); } - public static NamenodeRegistration convert(JournalInfoProto info) { + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; - StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0); - - // Note that the role is always {@link NamenodeRole#NAMENODE} as this - // conversion happens for messages from Namenode to Journal receivers. - // Addresses in the registration are unused. - return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE); + return new JournalInfo(lv, info.getClusterID(), nsID); } /** * Method used for converting {@link JournalInfoProto} sent from Namenode * to Journal receivers to {@link NamenodeRegistration}. */ - public static JournalInfoProto convertToJournalInfo( - NamenodeRegistration reg) { - return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID()) - .setLayoutVersion(reg.getLayoutVersion()) - .setNamespaceID(reg.getNamespaceID()).build(); + public static JournalInfoProto convert(JournalInfo j) { + return JournalInfoProto.newBuilder().setClusterID(j.getClusterId()) + .setLayoutVersion(j.getLayoutVersion()) + .setNamespaceID(j.getNamespaceId()).build(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java index 71210c6140..4e25eea313 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java @@ -31,6 +31,9 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.FencedException; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -40,6 +43,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; /** @@ -66,6 +70,8 @@ public class JournalService implements JournalProtocol { private final NamenodeProtocol namenode; private final StateHandler stateHandler = new StateHandler(); private final RPC.Server rpcServer; + private long epoch = 0; + private String fencerInfo; enum State { /** The service is initialized and ready to start. */ @@ -115,7 +121,7 @@ synchronized void waitForRoll() { current = State.WAITING_FOR_ROLL; } - synchronized void startLogSegment() throws IOException { + synchronized void startLogSegment() { if (current == State.WAITING_FOR_ROLL) { current = State.SYNCING; } @@ -232,28 +238,42 @@ public void stop() { } @Override - public void journal(NamenodeRegistration registration, long firstTxnId, + public void journal(JournalInfo journalInfo, long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Received journal " + firstTxnId + " " + numTxns); } stateHandler.isJournalAllowed(); - verify(registration); + verify(epoch, journalInfo); listener.journal(this, firstTxnId, numTxns, records); } @Override - public void startLogSegment(NamenodeRegistration registration, long txid) + public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Received startLogSegment " + txid); } stateHandler.isStartLogSegmentAllowed(); - verify(registration); + verify(epoch, journalInfo); listener.rollLogs(this, txid); stateHandler.startLogSegment(); } + @Override + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException { + LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch); + verifyFence(epoch, fencerInfo); + verify(journalInfo); + long previousEpoch = epoch; + this.epoch = epoch; + this.fencerInfo = fencerInfo; + + // TODO:HDFS-3092 set lastTransId and inSync + return new FenceResponse(previousEpoch, 0, false); + } + /** Create an RPC server. */ private static RPC.Server createRpcServer(Configuration conf, InetSocketAddress address, JournalProtocol impl) throws IOException { @@ -267,15 +287,54 @@ private static RPC.Server createRpcServer(Configuration conf, address.getHostName(), address.getPort(), 1, false, conf, null); } - private void verify(NamenodeRegistration reg) throws IOException { - if (!registration.getRegistrationID().equals(reg.getRegistrationID())) { - LOG.warn("Invalid registrationID - expected: " - + registration.getRegistrationID() + " received: " - + reg.getRegistrationID()); - throw new UnregisteredNodeException(reg); + private void verifyEpoch(long e) throws FencedException { + if (epoch != e) { + String errorMsg = "Epoch " + e + " is not valid. " + + "Resource has already been fenced by " + fencerInfo + + " with epoch " + epoch; + LOG.warn(errorMsg); + throw new FencedException(errorMsg); } } + private void verifyFence(long e, String fencer) throws FencedException { + if (e <= epoch) { + String errorMsg = "Epoch " + e + " from fencer " + fencer + + " is not valid. " + "Resource has already been fenced by " + + fencerInfo + " with epoch " + epoch; + LOG.warn(errorMsg); + throw new FencedException(errorMsg); + } + } + + /** + * Verifies a journal request + */ + private void verify(JournalInfo journalInfo) throws IOException { + String errorMsg = null; + int expectedNamespaceID = registration.getNamespaceID(); + if (journalInfo.getNamespaceId() != expectedNamespaceID) { + errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID + + " actual " + journalInfo.getNamespaceId(); + LOG.warn(errorMsg); + throw new UnregisteredNodeException(journalInfo); + } + if (!journalInfo.getClusterId().equals(registration.getClusterID())) { + errorMsg = "Invalid clusterId in journal request - expected " + + journalInfo.getClusterId() + " actual " + registration.getClusterID(); + LOG.warn(errorMsg); + throw new UnregisteredNodeException(journalInfo); + } + } + + /** + * Verifies a journal request + */ + private void verify(long e, JournalInfo journalInfo) throws IOException { + verifyEpoch(e); + verify(journalInfo); + } + /** * Register this service with the active namenode. */ @@ -298,4 +357,9 @@ private void handshake() throws IOException { listener.verifyVersion(this, nsInfo); registration.setStorageInfo(nsInfo); } -} \ No newline at end of file + + @VisibleForTesting + long getEpoch() { + return epoch; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java index de75b76934..ebf4f480f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; /** @@ -26,19 +27,20 @@ * to a BackupNode. */ class BackupJournalManager implements JournalManager { - - private final NamenodeRegistration nnReg; private final NamenodeRegistration bnReg; + private final JournalInfo journalInfo; BackupJournalManager(NamenodeRegistration bnReg, NamenodeRegistration nnReg) { + journalInfo = new JournalInfo(nnReg.getLayoutVersion(), + nnReg.getClusterID(), nnReg.getNamespaceID()); this.bnReg = bnReg; - this.nnReg = nnReg; } @Override public EditLogOutputStream startLogSegment(long txId) throws IOException { - EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg); + EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, + journalInfo); stm.startLogSegment(txId); return stm; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index ee08793eaa..1f005b016f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -217,7 +219,8 @@ void stop(boolean reportError) { } /* @Override */// NameNode - public boolean setSafeMode(SafeModeAction action) throws IOException { + public boolean setSafeMode(@SuppressWarnings("unused") SafeModeAction action) + throws IOException { throw new UnsupportedActionException("setSafeMode"); } @@ -236,51 +239,56 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn) /** * Verifies a journal request - * @param nodeReg node registration - * @throws UnregisteredNodeException if the registration is invalid */ - void verifyJournalRequest(NamenodeRegistration reg) throws IOException { - verifyVersion(reg.getLayoutVersion()); + private void verifyJournalRequest(JournalInfo journalInfo) + throws IOException { + verifyVersion(journalInfo.getLayoutVersion()); String errorMsg = null; int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID(); - if (reg.getNamespaceID() != expectedNamespaceID) { + if (journalInfo.getNamespaceId() != expectedNamespaceID) { errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID - + " actual " + reg.getNamespaceID(); + + " actual " + journalInfo.getNamespaceId(); LOG.warn(errorMsg); - throw new UnregisteredNodeException(reg); + throw new UnregisteredNodeException(journalInfo); } - if (!reg.getClusterID().equals(namesystem.getClusterId())) { + if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) { errorMsg = "Invalid clusterId in journal request - expected " - + reg.getClusterID() + " actual " + namesystem.getClusterId(); + + journalInfo.getClusterId() + " actual " + namesystem.getClusterId(); LOG.warn(errorMsg); - throw new UnregisteredNodeException(reg); + throw new UnregisteredNodeException(journalInfo); } } - ///////////////////////////////////////////////////// // BackupNodeProtocol implementation for backup node. ///////////////////////////////////////////////////// @Override - public void startLogSegment(NamenodeRegistration registration, long txid) - throws IOException { + public void startLogSegment(JournalInfo journalInfo, long epoch, + long txid) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); - verifyJournalRequest(registration); + verifyJournalRequest(journalInfo); getBNImage().namenodeStartedLogSegment(txid); } @Override - public void journal(NamenodeRegistration nnReg, - long firstTxId, int numTxns, - byte[] records) throws IOException { + public void journal(JournalInfo journalInfo, long epoch, long firstTxId, + int numTxns, byte[] records) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); - verifyJournalRequest(nnReg); + verifyJournalRequest(journalInfo); getBNImage().journal(firstTxId, numTxns, records); } private BackupImage getBNImage() { return (BackupImage)nn.getFSImage(); } + + @Override + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException { + LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch); + throw new UnsupportedOperationException( + "BackupNode does not support fence"); + } } ////////////////////////////////////////////////////// diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java index bdb4c5e773..5a28f7c512 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.io.DataOutputBuffer; @@ -42,18 +43,18 @@ class EditLogBackupOutputStream extends EditLogOutputStream { static int DEFAULT_BUFFER_SIZE = 256; - private JournalProtocol backupNode; // RPC proxy to backup node - private NamenodeRegistration bnRegistration; // backup node registration - private NamenodeRegistration nnRegistration; // active node registration + private final JournalProtocol backupNode; // RPC proxy to backup node + private final NamenodeRegistration bnRegistration; // backup node registration + private final JournalInfo journalInfo; // active node registration + private final DataOutputBuffer out; // serialized output sent to backup node private EditsDoubleBuffer doubleBuf; - private DataOutputBuffer out; // serialized output sent to backup node EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node - NamenodeRegistration nnReg) // active name-node + JournalInfo journalInfo) // active name-node throws IOException { super(); this.bnRegistration = bnReg; - this.nnRegistration = nnReg; + this.journalInfo = journalInfo; InetSocketAddress bnAddress = NetUtils.createSocketAddr(bnRegistration.getAddress()); try { @@ -127,8 +128,7 @@ protected void flushAndSync() throws IOException { out.reset(); assert out.getLength() == 0 : "Output buffer is not empty"; - backupNode.journal(nnRegistration, - firstTxToFlush, numReadyTxns, data); + backupNode.journal(journalInfo, 0, firstTxToFlush, numReadyTxns, data); } } @@ -140,6 +140,6 @@ NamenodeRegistration getRegistration() { } void startLogSegment(long txId) throws IOException { - backupNode.startLogSegment(nnRegistration, txId); + backupNode.startLogSegment(journalInfo, 0, txId); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java new file mode 100644 index 0000000000..5bbd76dd88 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Response to a journal fence request. See {@link JournalProtocol#fence} + */ +@InterfaceAudience.Private +public class FenceResponse { + private final long previousEpoch; + private final long lastTransactionId; + private final boolean isInSync; + + public FenceResponse(long previousEpoch, long lastTransId, boolean inSync) { + this.previousEpoch = previousEpoch; + this.lastTransactionId = lastTransId; + this.isInSync = inSync; + } + + public boolean isInSync() { + return isInSync; + } + + public long getLastTransactionId() { + return lastTransactionId; + } + + public long getPreviousEpoch() { + return previousEpoch; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java new file mode 100644 index 0000000000..2f9f54bd7e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.io.IOException; + +/** + * If a previous user of a resource tries to use a shared resource, after + * fenced by another user, this exception is thrown. + */ +public class FencedException extends IOException { + private static final long serialVersionUID = 1L; + + public FencedException(String errorMsg) { + super(errorMsg); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java new file mode 100644 index 0000000000..530934d237 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Information that describes a journal + */ +@InterfaceAudience.Private +public class JournalInfo { + private final int layoutVersion; + private final String clusterId; + private final int namespaceId; + + public JournalInfo(int lv, String clusterId, int nsId) { + this.layoutVersion = lv; + this.clusterId = clusterId; + this.namespaceId = nsId; + } + + public int getLayoutVersion() { + return layoutVersion; + } + + public String getClusterId() { + return clusterId; + } + + public int getNamespaceId() { + return namespaceId; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java index b9d55151f8..be514b96ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java @@ -21,7 +21,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo; /** @@ -53,12 +52,15 @@ public interface JournalProtocol { * via {@code EditLogBackupOutputStream} in order to synchronize meta-data * changes with the backup namespace image. * - * @param registration active node registration + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer * @param firstTxnId the first transaction of this batch * @param numTxns number of transactions * @param records byte array containing serialized journal records + * @throws FencedException if the resource has been fenced */ - public void journal(NamenodeRegistration registration, + public void journal(JournalInfo journalInfo, + long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException; @@ -66,9 +68,24 @@ public void journal(NamenodeRegistration registration, /** * Notify the BackupNode that the NameNode has rolled its edit logs * and is now writing a new log segment. - * @param registration the registration of the active NameNode + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer * @param txid the first txid in the new log + * @throws FencedException if the resource has been fenced */ - public void startLogSegment(NamenodeRegistration registration, + public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException; + + /** + * Request to fence any other journal writers. + * Older writers with at previous epoch will be fenced and can no longer + * perform journal operations. + * + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer + * @param fencerInfo info about fencer for debugging purposes + * @throws FencedException if the resource has been fenced + */ + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto index c15347190e..1e720bab05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto @@ -36,16 +36,18 @@ message JournalInfoProto { } /** - * JournalInfo - the information about the journal + * journalInfo - the information about the journal * firstTxnId - the first txid in the journal records * numTxns - Number of transactions in editlog * records - bytes containing serialized journal records + * epoch - change to this represents change of journal writer */ message JournalRequestProto { required JournalInfoProto journalInfo = 1; required uint64 firstTxnId = 2; required uint32 numTxns = 3; required bytes records = 4; + required uint64 epoch = 5; } /** @@ -55,12 +57,13 @@ message JournalResponseProto { } /** - * JournalInfo - the information about the journal + * journalInfo - the information about the journal * txid - first txid in the new log */ message StartLogSegmentRequestProto { - required JournalInfoProto journalInfo = 1; - required uint64 txid = 2; + required JournalInfoProto journalInfo = 1; // Info about the journal + required uint64 txid = 2; // Transaction ID + required uint64 epoch = 3; } /** @@ -69,6 +72,27 @@ message StartLogSegmentRequestProto { message StartLogSegmentResponseProto { } +/** + * journalInfo - the information about the journal + * txid - first txid in the new log + */ +message FenceRequestProto { + required JournalInfoProto journalInfo = 1; // Info about the journal + required uint64 epoch = 2; // Epoch - change indicates change in writer + optional string fencerInfo = 3; // Info about fencer for debugging +} + +/** + * previousEpoch - previous epoch if any or zero + * lastTransactionId - last valid transaction Id in the journal + * inSync - if all journal segments are available and in sync + */ +message FenceResponseProto { + optional uint64 previousEpoch = 1; + optional uint64 lastTransactionId = 2; + optional bool inSync = 3; +} + /** * Protocol used to journal edits to a remote node. Currently, * this is used to publish edits from the NameNode to a BackupNode. @@ -89,4 +113,10 @@ service JournalProtocolService { */ rpc startLogSegment(StartLogSegmentRequestProto) returns (StartLogSegmentResponseProto); + + /** + * Request to fence a journal receiver. + */ + rpc fence(FenceRequestProto) + returns (FenceResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java index 03c511f319..ab3ce9fee3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java @@ -20,12 +20,18 @@ import java.io.IOException; import java.net.InetSocketAddress; +import junit.framework.Assert; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.FencedException; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.junit.Test; import org.mockito.Mockito; @@ -42,7 +48,7 @@ public class TestJournalService { * called. */ @Test - public void testCallBacks() throws IOException { + public void testCallBacks() throws Exception { JournalListener listener = Mockito.mock(JournalListener.class); JournalService service = null; try { @@ -51,6 +57,7 @@ public void testCallBacks() throws IOException { service = startJournalService(listener); verifyRollLogsCallback(service, listener); verifyJournalCallback(service, listener); + verifyFence(service, cluster.getNameNode(0)); } finally { if (service != null) { service.stop(); @@ -93,4 +100,28 @@ private void verifyJournalCallback(JournalService s, JournalListener l) throws I Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s), Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any()); } + + public void verifyFence(JournalService s, NameNode nn) throws Exception { + String cid = nn.getNamesystem().getClusterId(); + int nsId = nn.getNamesystem().getFSImage().getNamespaceID(); + int lv = nn.getNamesystem().getFSImage().getLayoutVersion(); + + // Fence the journal service + JournalInfo info = new JournalInfo(lv, cid, nsId); + long currentEpoch = s.getEpoch(); + + // New epoch lower than the current epoch is rejected + try { + s.fence(info, (currentEpoch - 1), "fencer"); + } catch (FencedException ignore) { /* Ignored */ } + + // New epoch equal to the current epoch is rejected + try { + s.fence(info, currentEpoch, "fencer"); + } catch (FencedException ignore) { /* Ignored */ } + + // New epoch higher than the current epoch is successful + FenceResponse resp = s.fence(info, currentEpoch+1, "fencer"); + Assert.assertNotNull(resp); + } } \ No newline at end of file