diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7bafb1f44d..ffeff8119d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -3,7 +3,7 @@ Hadoop HDFS Change Log Trunk (unreleased changes) NEW FEATURES HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel - via hairong) + via hairong) HDFS-2517. Add protobuf service for JounralProtocol. (suresh) @@ -13,6 +13,8 @@ Trunk (unreleased changes) HDFS-2519. Add protobuf service for DatanodeProtocol. (suresh) + HDFS-2581. Implement protobuf service for JournalProtocol. (suresh) + IMPROVEMENTS HADOOP-7524 Change RPC to allow multiple protocols including multuple @@ -72,6 +74,9 @@ Trunk (unreleased changes) Move the support for multiple protocols to lower layer so that Writable, PB and Avro can all use it (Sanjay) + HDFS-1580. Add interface for generic Write Ahead Logging mechanisms. + (Ivan Kelly via jitendra) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 511adcfb17..1c26c4bc57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -163,6 +163,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir"; public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir"; public static final String DFS_NAMENODE_SHARED_EDITS_DIR_KEY = "dfs.namenode.shared.edits.dir"; + public static final String DFS_NAMENODE_EDITS_PLUGIN_PREFIX = "dfs.namenode.edits.journal-plugin"; public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base"; public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java new file mode 100644 index 0000000000..ebbdcb3d5c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolPB.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; + +/** + * Protocol used to journal edits to a remote node. Currently, + * this is used to publish edits from the NameNode to a BackupNode. + * + * Note: This extends the protocolbuffer service based interface to + * add annotations required for security. + */ +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, + clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY) +@ProtocolInfo(protocolName = + "org.apache.hadoop.hdfs.server.protocol.JournalProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface JournalProtocolPB extends + JournalProtocolService.BlockingInterface, VersionedProtocol { + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 the method name + * to avoid conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000000..389bf154d3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.VersionedProtocol; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Implementation for protobuf service that forwards requests + * received on {@link JournalProtocolPB} to the + * {@link JournalProtocol} server implementation. + */ +public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB { + /** Server side implementation to delegate the requests to */ + private final JournalProtocol impl; + + public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) { + this.impl = impl; + } + + /** @see JournalProtocol#journal */ + @Override + public JournalResponseProto journal(RpcController unused, + JournalRequestProto req) throws ServiceException { + try { + impl.journal(PBHelper.convert(req.getRegistration()), + req.getFirstTxnId(), req.getNumTxns(), req.getRecords() + .toByteArray()); + } catch (IOException e) { + throw new ServiceException(e); + } + return JournalResponseProto.newBuilder().build(); + } + + /** @see JournalProtocol#startLogSegment */ + @Override + public StartLogSegmentResponseProto startLogSegment(RpcController controller, + StartLogSegmentRequestProto req) throws ServiceException { + try { + impl.startLogSegment(PBHelper.convert(req.getRegistration()), + req.getTxid()); + } catch (IOException e) { + throw new ServiceException(e); + } + return StartLogSegmentResponseProto.newBuilder().build(); + } + + /** @see VersionedProtocol#getProtocolVersion */ + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return RPC.getProtocolVersion(JournalProtocolPB.class); + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client will never call this method. + * + * @see VersionedProtocol#getProtocolSignature(String, long, int) + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link JournalProtocol} + */ + if (!protocol.equals(RPC.getProtocolName(JournalProtocolPB.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(JournalProtocolPB.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion(JournalProtocolPB.class), + JournalProtocolPB.class); + } + + + @Override + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link JournalPBProtocol} + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java new file mode 100644 index 0000000000..adddf9a2f4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is the client side translator to translate the requests made on + * {@link JournalProtocol} interfaces to the RPC server implementing + * {@link JournalProtocolPB}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class JournalProtocolTranslatorPB implements JournalProtocol, Closeable { + /** RpcController is not used and hence is set to null */ + private final static RpcController NULL_CONTROLLER = null; + private final JournalProtocolPB rpcProxy; + + public JournalProtocolTranslatorPB(InetSocketAddress nameNodeAddr, + Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class); + rpcProxy = RPC.getProxy(JournalProtocolPB.class, + JournalProtocol.versionID, nameNodeAddr, conf); + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return 0; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocol, clientVersion, clientMethodsHash)); + } + + @Override + public void journal(NamenodeRegistration reg, long firstTxnId, + int numTxns, byte[] records) throws IOException { + JournalRequestProto req = JournalRequestProto.newBuilder() + .setRegistration(PBHelper.convert(reg)) + .setFirstTxnId(firstTxnId) + .setNumTxns(numTxns) + .setRecords(PBHelper.getByteString(records)) + .build(); + try { + rpcProxy.journal(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void startLogSegment(NamenodeRegistration registration, long txid) + throws IOException { + StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() + .setRegistration(PBHelper.convert(registration)) + .setTxid(txid) + .build(); + try { + rpcProxy.startLogSegment(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java new file mode 100644 index 0000000000..598c7fb416 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; + +import com.google.protobuf.ByteString; + +/** + * Utilities for converting protobuf classes to and from + * implementation classes. + */ +class PBHelper { + private PBHelper() { + /** Hidden constructor */ + } + + public static ByteString getByteString(byte[] bytes) { + return ByteString.copyFrom(bytes); + } + + public static NamenodeRole convert(NamenodeRoleProto role) { + switch (role) { + case NAMENODE: + return NamenodeRole.NAMENODE; + case BACKUP: + return NamenodeRole.BACKUP; + case CHECKPOINT: + return NamenodeRole.CHECKPOINT; + } + return null; + } + + public static NamenodeRoleProto convert(NamenodeRole role) { + switch (role) { + case NAMENODE: + return NamenodeRoleProto.NAMENODE; + case BACKUP: + return NamenodeRoleProto.BACKUP; + case CHECKPOINT: + return NamenodeRoleProto.CHECKPOINT; + } + return null; + } + + public static StorageInfoProto convert(StorageInfo info) { + return StorageInfoProto.newBuilder().setClusterID(info.getClusterID()) + .setCTime(info.getCTime()) + .setLayoutVersion(info.getLayoutVersion()) + .setNamespceID(info.getNamespaceID()) + .build(); + } + + public static StorageInfo convert(StorageInfoProto info) { + return new StorageInfo(info.getLayoutVersion(), info.getNamespceID(), + info.getClusterID(), info.getCTime()); + } + + + public static NamenodeRegistrationProto convert(NamenodeRegistration reg) { + return NamenodeRegistrationProto.newBuilder() + .setHttpAddress(reg.getHttpAddress()) + .setRole(convert(reg.getRole())) + .setRpcAddress(reg.getAddress()) + .setStorageInfo(convert((StorageInfo) reg)).build(); + } + + public static NamenodeRegistration convert(NamenodeRegistrationProto reg) { + return new NamenodeRegistration(reg.getRpcAddress(), reg.getHttpAddress(), + convert(reg.getStorageInfo()), convert(reg.getRole())); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/overview.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/overview.html new file mode 100644 index 0000000000..cf620f379b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/overview.html @@ -0,0 +1,62 @@ + + + + + Protocol Buffers based data types for NN protocols + + +

+The Protocol Buffers data types for NN protocols that use PB go in this package. +

+

Steps to add a new protocol

+
+
    +
  1. Define the protobuf service for the protocol in <ProtocolName>.proto class. + +
  2. Generate java files from the proto file using protoc tool. +
  3. Define server side interface that extends BlockingInterface from the +generated files (Example: NamenodeProtocolService.BlockingInterface) +and VersionedProtocol. See NamenodePBProtocol.java for example. +
  4. Define client side translator to translate the client protocol to +protobuf. See NamenodeProtocolTranslator. +
  5. Define server side implementation that implements the server side interface. +This implementation receives the protobuf requests and delegates it to the +server side implementation. See NamenodePBProtocolImpl for example. +
  6. Make changes to register this protocol at the server. See the other +protocols on how this is done. +
+

Steps to make changes to the existing protocol in a compatible way

+
+
    +
  1. Adding new methods is a compatible change.
  2. +
  3. When modifying an existing method, do not change the required parameters +to optional or optional parameters to required. Only add optional parameters +to the request and response.
  4. +
  5. When modifying an existing type, do not change the required parameters +to optional or optional parameters to require and optional parameters to +required. Only add optional parameters to the request and response.
  6. +
+ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolProtocolBuffers/overview.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolProtocolBuffers/overview.html index 6d41cfdf5a..e69de29bb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolProtocolBuffers/overview.html +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolProtocolBuffers/overview.html @@ -1,29 +0,0 @@ - - - - - Protocol Buffers based data types for NN protocols - - -

-The Protocol Buffers data types for NN protocols that use -PB go in this package. -

- - - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java index 68bcdba6ed..a0fb8fe629 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java @@ -103,7 +103,7 @@ public void close() throws IOException { } @Override - long length() throws IOException { + public long length() throws IOException { // file size + size of both buffers return inner.length(); } @@ -135,7 +135,7 @@ public long getLastTxId() throws IOException { } @Override - boolean isInProgress() { + public boolean isInProgress() { return true; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java index 067990d01b..711fcce48e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java @@ -67,12 +67,12 @@ class EditLogBackupOutputStream extends EditLogOutputStream { } @Override // EditLogOutputStream - void write(FSEditLogOp op) throws IOException { + public void write(FSEditLogOp op) throws IOException { doubleBuf.writeOp(op); } @Override - void writeRaw(byte[] bytes, int offset, int length) throws IOException { + public void writeRaw(byte[] bytes, int offset, int length) throws IOException { throw new IOException("Not supported"); } @@ -80,7 +80,7 @@ void writeRaw(byte[] bytes, int offset, int length) throws IOException { * There is no persistent storage. Just clear the buffers. */ @Override // EditLogOutputStream - void create() throws IOException { + public void create() throws IOException { assert doubleBuf.isFlushed() : "previous data is not flushed yet"; this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); } @@ -106,7 +106,7 @@ public void abort() throws IOException { } @Override // EditLogOutputStream - void setReadyToFlush() throws IOException { + public void setReadyToFlush() throws IOException { doubleBuf.setReadyToFlush(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 719ef78100..3857db236c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -129,13 +129,13 @@ public void close() throws IOException { } @Override - long length() throws IOException { + public long length() throws IOException { // file size + size of both buffers return file.length(); } @Override - boolean isInProgress() { + public boolean isInProgress() { return isInProgress; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java index 4780d04b00..13c76ae1e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java @@ -73,7 +73,7 @@ class EditLogFileOutputStream extends EditLogOutputStream { /** {@inheritDoc} */ @Override - void write(FSEditLogOp op) throws IOException { + public void write(FSEditLogOp op) throws IOException { doubleBuf.writeOp(op); } @@ -86,7 +86,7 @@ void write(FSEditLogOp op) throws IOException { * * */ @Override - void writeRaw(byte[] bytes, int offset, int length) throws IOException { + public void writeRaw(byte[] bytes, int offset, int length) throws IOException { doubleBuf.writeRaw(bytes, offset, length); } @@ -94,7 +94,7 @@ void writeRaw(byte[] bytes, int offset, int length) throws IOException { * Create empty edits logs file. */ @Override - void create() throws IOException { + public void create() throws IOException { fc.truncate(0); fc.position(0); doubleBuf.getCurrentBuf().writeInt(HdfsConstants.LAYOUT_VERSION); @@ -150,7 +150,7 @@ public void abort() throws IOException { * data can be still written to the stream while flushing is performed. */ @Override - void setReadyToFlush() throws IOException { + public void setReadyToFlush() throws IOException { doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker doubleBuf.setReadyToFlush(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java index c66977c071..2c4bdd53d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import java.io.Closeable; import java.io.IOException; @@ -79,7 +81,7 @@ public abstract class EditLogInputStream implements JournalStream, Closeable { /** * Return the size of the current edits log. */ - abstract long length() throws IOException; + public abstract long length() throws IOException; /** * Return true if this stream is in progress, false if it is finalized. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java index 8681837de5..d0fc156801 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java @@ -21,17 +21,21 @@ import static org.apache.hadoop.hdfs.server.common.Util.now; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * A generic abstract class to support journaling of edits logs into * a persistent storage. */ -abstract class EditLogOutputStream { +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class EditLogOutputStream { // these are statistics counters private long numSync; // number of sync(s) to disk private long totalTimeSync; // total time to sync - EditLogOutputStream() { + public EditLogOutputStream() throws IOException { numSync = totalTimeSync = 0; } @@ -41,7 +45,7 @@ abstract class EditLogOutputStream { * @param op operation * @throws IOException */ - abstract void write(FSEditLogOp op) throws IOException; + abstract public void write(FSEditLogOp op) throws IOException; /** * Write raw data to an edit log. This data should already have @@ -54,7 +58,7 @@ abstract class EditLogOutputStream { * @param length number of bytes to write * @throws IOException */ - abstract void writeRaw(byte[] bytes, int offset, int length) + abstract public void writeRaw(byte[] bytes, int offset, int length) throws IOException; /** @@ -62,7 +66,7 @@ abstract void writeRaw(byte[] bytes, int offset, int length) * * @throws IOException */ - abstract void create() throws IOException; + abstract public void create() throws IOException; /** * Close the journal. @@ -81,7 +85,7 @@ abstract void writeRaw(byte[] bytes, int offset, int length) * All data that has been written to the stream so far will be flushed. * New data can be still written to the stream while flushing is performed. */ - abstract void setReadyToFlush() throws IOException; + abstract public void setReadyToFlush() throws IOException; /** * Flush and sync all data that is ready to be flush diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index bfe971b5eb..aa16069ed1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.lang.reflect.Constructor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; @@ -122,6 +124,7 @@ private enum State { private NameNodeMetrics metrics; private NNStorage storage; + private Configuration conf; private static class TransactionId { public long txid; @@ -163,6 +166,7 @@ protected synchronized TransactionId initialValue() { * @param editsDirs List of journals to use */ FSEditLog(Configuration conf, NNStorage storage, Collection editsDirs) { + this.conf = conf; isSyncRunning = false; this.storage = storage; metrics = NameNode.getNameNodeMetrics(); @@ -210,9 +214,13 @@ public void initSharedJournalsForRead() { private void initJournals(Collection dirs) { this.journalSet = new JournalSet(); for (URI u : dirs) { - StorageDirectory sd = storage.getStorageDirectory(u); - if (sd != null) { - journalSet.add(new FileJournalManager(sd)); + if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { + StorageDirectory sd = storage.getStorageDirectory(u); + if (sd != null) { + journalSet.add(new FileJournalManager(sd)); + } + } else { + journalSet.add(createJournal(u)); } } @@ -1053,4 +1061,53 @@ static void closeAllStreams(Iterable streams) { IOUtils.closeStream(s); } } + + /** + * Retrieve the implementation class for a Journal scheme. + * @param conf The configuration to retrieve the information from + * @param uriScheme The uri scheme to look up. + * @return the class of the journal implementation + * @throws IllegalArgumentException if no class is configured for uri + */ + static Class getJournalClass(Configuration conf, + String uriScheme) { + String key + = DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + "." + uriScheme; + Class clazz = null; + try { + clazz = conf.getClass(key, null, JournalManager.class); + } catch (RuntimeException re) { + throw new IllegalArgumentException( + "Invalid class specified for " + uriScheme, re); + } + + if (clazz == null) { + LOG.warn("No class configured for " +uriScheme + + ", " + key + " is empty"); + throw new IllegalArgumentException( + "No class configured for " + uriScheme); + } + return clazz; + } + + /** + * Construct a custom journal manager. + * The class to construct is taken from the configuration. + * @param uri Uri to construct + * @return The constructed journal manager + * @throws IllegalArgumentException if no class is configured for uri + */ + private JournalManager createJournal(URI uri) { + Class clazz + = getJournalClass(conf, uri.getScheme()); + + try { + Constructor cons + = clazz.getConstructor(Configuration.class, URI.class); + return cons.newInstance(conf, uri); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to construct journal, " + + uri, e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 30b0b8c151..80aa115df1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -25,6 +25,8 @@ import java.util.Arrays; import java.util.EnumMap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -57,6 +59,8 @@ import org.apache.hadoop.hdfs.util.Holder; import com.google.common.base.Joiner; +@InterfaceAudience.Private +@InterfaceStability.Evolving public class FSEditLogLoader { private final FSNamesystem fsNamesys; @@ -514,7 +518,7 @@ long getNumTransactions() { /** * Stream wrapper that keeps track of the current stream position. */ - static class PositionTrackingInputStream extends FilterInputStream { + public static class PositionTrackingInputStream extends FilterInputStream { private long curPos = 0; private long markPos = -1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 3adb439329..61b4ef8a41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -113,6 +113,10 @@ private FSEditLogOp(FSEditLogOpCodes opCode) { this.txid = 0; } + public long getTransactionId() { + return txid; + } + public void setTransactionId(long txid) { this.txid = txid; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java index 348e3ef981..d45de18e92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java @@ -20,6 +20,8 @@ import java.io.Closeable; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * A JournalManager is responsible for managing a single place of storing @@ -28,7 +30,9 @@ * each conceptual place of storage corresponds to exactly one instance of * this class, which is created when the EditLog is first opened. */ -interface JournalManager extends Closeable { +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface JournalManager extends Closeable { /** * Begin writing to a new segment of the log stream, which starts at * the given transaction ID. @@ -71,7 +75,6 @@ long getNumberOfTransactions(long fromTxnId) * * @param minTxIdToKeep the earliest txid that must be retained after purging * old logs - * @param purger the purging implementation to use * @throws IOException if purging fails */ void purgeLogsOlderThan(long minTxIdToKeep) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index 45b5714082..8607364a56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -309,7 +309,7 @@ private class JournalSetOutputStream extends EditLogOutputStream { } @Override - void write(final FSEditLogOp op) + public void write(final FSEditLogOp op) throws IOException { mapJournalsAndReportErrors(new JournalClosure() { @Override @@ -322,7 +322,7 @@ public void apply(JournalAndStream jas) throws IOException { } @Override - void writeRaw(final byte[] data, final int offset, final int length) + public void writeRaw(final byte[] data, final int offset, final int length) throws IOException { mapJournalsAndReportErrors(new JournalClosure() { @Override @@ -335,7 +335,7 @@ public void apply(JournalAndStream jas) throws IOException { } @Override - void create() throws IOException { + public void create() throws IOException { mapJournalsAndReportErrors(new JournalClosure() { @Override public void apply(JournalAndStream jas) throws IOException { @@ -367,7 +367,7 @@ public void apply(JournalAndStream jas) throws IOException { } @Override - void setReadyToFlush() throws IOException { + public void setReadyToFlush() throws IOException { mapJournalsAndReportErrors(new JournalClosure() { @Override public void apply(JournalAndStream jas) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index 7bddaeb5d1..118e4d26de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -71,7 +71,8 @@ public class NNStorage extends Storage implements Closeable { private static final Log LOG = LogFactory.getLog(NNStorage.class.getName()); static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest"; - + static final String LOCAL_URI_SCHEME = "file"; + // // The filenames used for storing the images // @@ -338,22 +339,14 @@ StorageDirectory getStorageDirectory(URI uri) { /** * Checks the consistency of a URI, in particular if the scheme - * is specified and is supported by a concrete implementation + * is specified * @param u URI whose consistency is being checked. */ private static void checkSchemeConsistency(URI u) throws IOException { String scheme = u.getScheme(); // the URI should have a proper scheme - if(scheme == null) + if(scheme == null) { throw new IOException("Undefined scheme for " + u); - else { - try { - // the scheme should be enumerated as JournalType - JournalType.valueOf(scheme.toUpperCase()); - } catch (IllegalArgumentException iae){ - throw new IOException("Unknown scheme " + scheme + - ". It should correspond to a JournalType enumeration value"); - } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java index 4d7cfd8fa9..9283f92cad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.server.common.Util; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Collections2; +import com.google.common.base.Predicate; /** * @@ -69,7 +71,18 @@ public NameNodeResourceChecker(Configuration conf) throws IOException { .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY)); addDirsToCheck(FSNamesystem.getNamespaceDirs(conf)); - addDirsToCheck(FSNamesystem.getNamespaceEditsDirs(conf)); + + Collection localEditDirs = Collections2.filter( + FSNamesystem.getNamespaceEditsDirs(conf), + new Predicate() { + public boolean apply(URI input) { + if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { + return true; + } + return false; + } + }); + addDirsToCheck(localEditDirs); addDirsToCheck(extraCheckedVolumes); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java new file mode 100644 index 0000000000..85aa91b914 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import static junit.framework.Assert.*; + +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.junit.Test; + +/** + * Tests for {@link PBHelper} + */ +public class TestPBHelper { + @Test + public void testConvertNamenodeRole() { + assertEquals(NamenodeRoleProto.BACKUP, + PBHelper.convert(NamenodeRole.BACKUP)); + assertEquals(NamenodeRoleProto.CHECKPOINT, + PBHelper.convert(NamenodeRole.CHECKPOINT)); + assertEquals(NamenodeRoleProto.NAMENODE, + PBHelper.convert(NamenodeRole.NAMENODE)); + assertEquals(NamenodeRole.BACKUP, + PBHelper.convert(NamenodeRoleProto.BACKUP)); + assertEquals(NamenodeRole.CHECKPOINT, + PBHelper.convert(NamenodeRoleProto.CHECKPOINT)); + assertEquals(NamenodeRole.NAMENODE, + PBHelper.convert(NamenodeRoleProto.NAMENODE)); + } + + @Test + public void testConvertStoragInfo() { + StorageInfo info = new StorageInfo(1, 2, "cid", 3); + StorageInfoProto infoProto = PBHelper.convert(info); + StorageInfo info2 = PBHelper.convert(infoProto); + assertEquals(info.getClusterID(), info2.getClusterID()); + assertEquals(info.getCTime(), info2.getCTime()); + assertEquals(info.getLayoutVersion(), info2.getLayoutVersion()); + assertEquals(info.getNamespaceID(), info2.getNamespaceID()); + } + + @Test + public void testConvertNamenodeRegistration() { + StorageInfo info = new StorageInfo(1, 2, "cid", 3); + NamenodeRegistration reg = new NamenodeRegistration("address:999", + "http:1000", info, NamenodeRole.NAMENODE); + NamenodeRegistrationProto regProto = PBHelper.convert(reg); + NamenodeRegistration reg2 = PBHelper.convert(regProto); + assertEquals(reg.getAddress(), reg2.getAddress()); + assertEquals(reg.getClusterID(), reg2.getClusterID()); + assertEquals(reg.getCTime(), reg2.getCTime()); + assertEquals(reg.getHttpAddress(), reg2.getHttpAddress()); + assertEquals(reg.getLayoutVersion(), reg2.getLayoutVersion()); + assertEquals(reg.getNamespaceID(), reg2.getNamespaceID()); + assertEquals(reg.getRegistrationID(), reg2.getRegistrationID()); + assertEquals(reg.getRole(), reg2.getRole()); + assertEquals(reg.getVersion(), reg2.getVersion()); + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index a8a3ac4cb6..104d652788 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -773,7 +773,7 @@ public JournalType getType() { } @Override - boolean isInProgress() { + public boolean isInProgress() { return true; } }