diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3c639f696f..55f69afe49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -120,6 +120,8 @@ Trunk (Unreleased) HDFS-4633 TestDFSClientExcludedNodes fails sporadically if excluded nodes cache expires too quickly (Chris Nauroth via Sanjay) + HDFS-4904. Remove JournalService. (Arpit Agarwal via cnauroth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java deleted file mode 100644 index 2d5ec9e986..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.journalservice; - -import java.io.IOException; - -import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; - -/** - * JournalListener is a callback interface to handle journal records - * received from the namenode. - */ -public interface JournalListener { - /** - * Check the namespace information returned by a namenode - * @param service service that is making the callback - * @param info returned namespace information from the namenode - * - * The application using {@link JournalService} can stop the service if - * {@code info} validation fails. - */ - public void verifyVersion(JournalService service, NamespaceInfo info); - - /** - * Process the received Journal record - * @param service {@link JournalService} making the callback - * @param firstTxnId first transaction Id in the journal - * @param numTxns number of records - * @param records journal records - * @throws IOException on error - * - * Any IOException thrown from the listener is thrown back in - * {@link JournalProtocol#journal} - */ - public void journal(JournalService service, long firstTxnId, int numTxns, - byte[] records) throws IOException; - - /** - * Roll the editlog - * @param service {@link JournalService} making the callback - * @param txid transaction ID to roll at - * - * Any IOException thrown from the listener is thrown back in - * {@link JournalProtocol#startLogSegment} - */ - public void startLogSegment(JournalService service, long txid) throws IOException; -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java deleted file mode 100644 index 4764259506..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java +++ /dev/null @@ -1,366 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.journalservice; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.NameNodeProxies; -import org.apache.hadoop.hdfs.protocol.LayoutVersion; -import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; -import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService; -import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; -import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.protocol.FenceResponse; -import org.apache.hadoop.hdfs.server.protocol.FencedException; -import org.apache.hadoop.hdfs.server.protocol.JournalInfo; -import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingService; - -/** - * This class interfaces with the namenode using {@link JournalProtocol} over - * RPC. It has two modes:
- * - * - * The received journal operations are sent to a listener over callbacks. The - * listener implementation can handle the callbacks based on the application - * requirement. - */ -public class JournalService implements JournalProtocol { - public static final Log LOG = LogFactory.getLog(JournalService.class.getName()); - - private final JournalListener listener; - private final InetSocketAddress nnAddress; - private final NamenodeRegistration registration; - private final NamenodeProtocol namenode; - private final StateHandler stateHandler = new StateHandler(); - private final RPC.Server rpcServer; - private long epoch = 0; - private String fencerInfo; - - enum State { - /** The service is initialized and ready to start. */ - INIT(false, false), - /** - * RPC server is started. - * The service is ready to receive requests from namenode. - */ - STARTED(false, false), - /** The service is fenced by a namenode and waiting for roll. */ - WAITING_FOR_ROLL(false, true), - /** - * The existing log is syncing with another source - * and it accepts journal from Namenode. - */ - SYNCING(true, true), - /** The existing log is in sync and it accepts journal from Namenode. */ - IN_SYNC(true, true), - /** The service is stopped. */ - STOPPED(false, false); - - final boolean isJournalAllowed; - final boolean isStartLogSegmentAllowed; - - State(boolean isJournalAllowed, boolean isStartLogSegmentAllowed) { - this.isJournalAllowed = isJournalAllowed; - this.isStartLogSegmentAllowed = isStartLogSegmentAllowed; - } - } - - static class StateHandler { - State current = State.INIT; - - synchronized void start() { - if (current != State.INIT) { - throw new IllegalStateException("Service cannot be started in " - + current + " state."); - } - current = State.STARTED; - } - - synchronized void waitForRoll() { - if (current != State.STARTED) { - throw new IllegalStateException("Cannot wait-for-roll in " + current - + " state."); - } - current = State.WAITING_FOR_ROLL; - } - - synchronized void startLogSegment() { - if (current == State.WAITING_FOR_ROLL) { - current = State.SYNCING; - } - } - - synchronized void isStartLogSegmentAllowed() throws IOException { - if (!current.isStartLogSegmentAllowed) { - throw new IOException("Cannot start log segment in " + current - + " state."); - } - } - - synchronized void isJournalAllowed() throws IOException { - if (!current.isJournalAllowed) { - throw new IOException("Cannot journal in " + current + " state."); - } - } - - synchronized boolean isStopped() { - if (current == State.STOPPED) { - LOG.warn("Ignore stop request since the service is in " + current - + " state."); - return true; - } - current = State.STOPPED; - return false; - } - } - - /** - * Constructor to create {@link JournalService} where an RPC server is - * created by this service. - * @param conf Configuration - * @param nnAddr host:port for the active Namenode's RPC server - * @param serverAddress address to start RPC server to receive - * {@link JournalProtocol} requests. This can be null, if - * {@code server} is a valid server that is managed out side this - * service. - * @param listener call-back interface to listen to journal activities - * @throws IOException on error - */ - JournalService(Configuration conf, InetSocketAddress nnAddr, - InetSocketAddress serverAddress, JournalListener listener) - throws IOException { - this.nnAddress = nnAddr; - this.listener = listener; - this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr, - NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true) - .getProxy(); - this.rpcServer = createRpcServer(conf, serverAddress, this); - - String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress()); - StorageInfo storage = new StorageInfo( - LayoutVersion.getCurrentLayoutVersion(), 0, "", 0); - registration = new NamenodeRegistration(addr, "", storage, - NamenodeRole.BACKUP); - } - - /** - * Start the service. - */ - public void start() { - stateHandler.start(); - - // Start the RPC server - LOG.info("Starting rpc server"); - rpcServer.start(); - - for(boolean registered = false, handshakeComplete = false; ; ) { - try { - // Perform handshake - if (!handshakeComplete) { - handshake(); - handshakeComplete = true; - LOG.info("handshake completed"); - } - - // Register with the namenode - if (!registered) { - registerWithNamenode(); - registered = true; - LOG.info("Registration completed"); - break; - } - } catch (IOException ioe) { - LOG.warn("Encountered exception ", ioe); - } catch (Exception e) { - LOG.warn("Encountered exception ", e); - } - - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - LOG.warn("Encountered exception ", ie); - } - } - - stateHandler.waitForRoll(); - try { - namenode.rollEditLog(); - } catch (IOException e) { - LOG.warn("Encountered exception ", e); - } - } - - /** - * Stop the service. For application with RPC Server managed outside, the - * RPC Server must be stopped the application. - */ - public void stop() { - if (!stateHandler.isStopped()) { - rpcServer.stop(); - } - } - - @Override - public void journal(JournalInfo journalInfo, long epoch, long firstTxnId, - int numTxns, byte[] records) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Received journal " + firstTxnId + " " + numTxns); - } - stateHandler.isJournalAllowed(); - verify(epoch, journalInfo); - listener.journal(this, firstTxnId, numTxns, records); - } - - @Override - public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) - throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("Received startLogSegment " + txid); - } - stateHandler.isStartLogSegmentAllowed(); - verify(epoch, journalInfo); - listener.startLogSegment(this, txid); - stateHandler.startLogSegment(); - } - - @Override - public FenceResponse fence(JournalInfo journalInfo, long epoch, - String fencerInfo) throws IOException { - LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch); - verifyFence(epoch, fencerInfo); - verify(journalInfo); - long previousEpoch = epoch; - this.epoch = epoch; - this.fencerInfo = fencerInfo; - - // TODO:HDFS-3092 set lastTransId and inSync - return new FenceResponse(previousEpoch, 0, false); - } - - /** Create an RPC server. */ - private static RPC.Server createRpcServer(Configuration conf, - InetSocketAddress address, JournalProtocol impl) throws IOException { - RPC.setProtocolEngine(conf, JournalProtocolPB.class, - ProtobufRpcEngine.class); - JournalProtocolServerSideTranslatorPB xlator = - new JournalProtocolServerSideTranslatorPB(impl); - BlockingService service = - JournalProtocolService.newReflectiveBlockingService(xlator); - return new RPC.Builder(conf).setProtocol(JournalProtocolPB.class) - .setInstance(service).setBindAddress(address.getHostName()) - .setPort(address.getPort()).setNumHandlers(1).setVerbose(false).build(); - } - - private void verifyEpoch(long e) throws FencedException { - if (epoch != e) { - String errorMsg = "Epoch " + e + " is not valid. " - + "Resource has already been fenced by " + fencerInfo - + " with epoch " + epoch; - LOG.warn(errorMsg); - throw new FencedException(errorMsg); - } - } - - private void verifyFence(long e, String fencer) throws FencedException { - if (e <= epoch) { - String errorMsg = "Epoch " + e + " from fencer " + fencer - + " is not valid. " + "Resource has already been fenced by " - + fencerInfo + " with epoch " + epoch; - LOG.warn(errorMsg); - throw new FencedException(errorMsg); - } - } - - /** - * Verifies a journal request - */ - private void verify(JournalInfo journalInfo) throws IOException { - String errorMsg = null; - int expectedNamespaceID = registration.getNamespaceID(); - if (journalInfo.getNamespaceId() != expectedNamespaceID) { - errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID - + " actual " + journalInfo.getNamespaceId(); - LOG.warn(errorMsg); - throw new UnregisteredNodeException(journalInfo); - } - if (!journalInfo.getClusterId().equals(registration.getClusterID())) { - errorMsg = "Invalid clusterId in journal request - expected " - + journalInfo.getClusterId() + " actual " + registration.getClusterID(); - LOG.warn(errorMsg); - throw new UnregisteredNodeException(journalInfo); - } - } - - /** - * Verifies a journal request - */ - private void verify(long e, JournalInfo journalInfo) throws IOException { - verifyEpoch(e); - verify(journalInfo); - } - - /** - * Register this service with the active namenode. - */ - private void registerWithNamenode() throws IOException { - NamenodeRegistration nnReg = namenode.register(registration); - String msg = null; - if(nnReg == null) { // consider as a rejection - msg = "Registration rejected by " + nnAddress; - } else if(!nnReg.isRole(NamenodeRole.NAMENODE)) { - msg = " Name-node " + nnAddress + " is not active"; - } - if(msg != null) { - LOG.error(msg); - throw new IOException(msg); // stop the node - } - } - - private void handshake() throws IOException { - NamespaceInfo nsInfo = namenode.versionRequest(); - listener.verifyVersion(this, nsInfo); - registration.setStorageInfo(nsInfo); - } - - @VisibleForTesting - long getEpoch() { - return epoch; - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java deleted file mode 100644 index 41a4fe4555..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.journalservice; - -import static org.junit.Assert.assertNotNull; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileSystemTestHelper; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.FenceResponse; -import org.apache.hadoop.hdfs.server.protocol.FencedException; -import org.apache.hadoop.hdfs.server.protocol.JournalInfo; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * Tests for {@link JournalService} - */ -public class TestJournalService { - private MiniDFSCluster cluster; - private Configuration conf = new HdfsConfiguration(); - - /** - * Test calls backs {@link JournalListener#startLogSegment(JournalService, long)} and - * {@link JournalListener#journal(JournalService, long, int, byte[])} are - * called. - */ - @Test - public void testCallBacks() throws Exception { - JournalListener listener = Mockito.mock(JournalListener.class); - JournalService service = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(0); - service = startJournalService(listener); - verifyRollLogsCallback(service, listener); - verifyJournalCallback(service, listener); - verifyFence(service, cluster.getNameNode(0)); - } finally { - if (service != null) { - service.stop(); - } - if (cluster != null) { - cluster.shutdown(); - } - } - } - - private JournalService startJournalService(JournalListener listener) - throws IOException { - InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress(); - InetSocketAddress serverAddr = new InetSocketAddress(0); - JournalService service = new JournalService(conf, nnAddr, serverAddr, - listener); - service.start(); - return service; - } - - /** - * Starting {@link JournalService} should result in Namenode calling - * {@link JournalService#startLogSegment}, resulting in callback - * {@link JournalListener#rollLogs} - */ - private void verifyRollLogsCallback(JournalService s, JournalListener l) - throws IOException { - Mockito.verify(l, Mockito.times(1)).startLogSegment(Mockito.eq(s), Mockito.anyLong()); - } - - /** - * File system write operations should result in JournalListener call - * backs. - */ - private void verifyJournalCallback(JournalService s, JournalListener l) throws IOException { - Path fileName = new Path("/tmp/verifyJournalCallback"); - FileSystem fs = cluster.getFileSystem(); - FileSystemTestHelper.createFile(fs, fileName); - fs.delete(fileName, true); - Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s), - Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any()); - } - - public void verifyFence(JournalService s, NameNode nn) throws Exception { - String cid = nn.getNamesystem().getClusterId(); - int nsId = nn.getNamesystem().getFSImage().getNamespaceID(); - int lv = nn.getNamesystem().getFSImage().getLayoutVersion(); - - // Fence the journal service - JournalInfo info = new JournalInfo(lv, cid, nsId); - long currentEpoch = s.getEpoch(); - - // New epoch lower than the current epoch is rejected - try { - s.fence(info, (currentEpoch - 1), "fencer"); - } catch (FencedException ignore) { /* Ignored */ } - - // New epoch equal to the current epoch is rejected - try { - s.fence(info, currentEpoch, "fencer"); - } catch (FencedException ignore) { /* Ignored */ } - - // New epoch higher than the current epoch is successful - FenceResponse resp = s.fence(info, currentEpoch+1, "fencer"); - assertNotNull(resp); - } -} \ No newline at end of file