Merge trunk into branch.

Resolved conflicts generated by commit of HDFS-1580 in trunk:
- made EditLogInputStream.isInProgress public
- fixed trivial conflict in DFSConfigKeys


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1210666 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-12-05 22:10:35 +00:00
parent 6491444357
commit f39aac60e0
23 changed files with 642 additions and 72 deletions

View File

@ -3,7 +3,7 @@ Hadoop HDFS Change Log
Trunk (unreleased changes)
NEW FEATURES
HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel
via hairong)
via hairong)
HDFS-2517. Add protobuf service for JounralProtocol. (suresh)
@ -13,6 +13,8 @@ Trunk (unreleased changes)
HDFS-2519. Add protobuf service for DatanodeProtocol. (suresh)
HDFS-2581. Implement protobuf service for JournalProtocol. (suresh)
IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple
@ -72,6 +74,9 @@ Trunk (unreleased changes)
Move the support for multiple protocols to lower layer so that Writable,
PB and Avro can all use it (Sanjay)
HDFS-1580. Add interface for generic Write Ahead Logging mechanisms.
(Ivan Kelly via jitendra)
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)

View File

@ -163,6 +163,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
public static final String DFS_NAMENODE_SHARED_EDITS_DIR_KEY = "dfs.namenode.shared.edits.dir";
public static final String DFS_NAMENODE_EDITS_PLUGIN_PREFIX = "dfs.namenode.edits.journal-plugin";
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
* Protocol used to journal edits to a remote node. Currently,
* this is used to publish edits from the NameNode to a BackupNode.
*
* Note: This extends the protocolbuffer service based interface to
* add annotations required for security.
*/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
@ProtocolInfo(protocolName =
"org.apache.hadoop.hdfs.server.protocol.JournalProtocol",
protocolVersion = 1)
@InterfaceAudience.Private
public interface JournalProtocolPB extends
JournalProtocolService.BlockingInterface, VersionedProtocol {
/**
* This method is defined to get the protocol signature using
* the R23 protocol - hence we have added the suffix of 2 the method name
* to avoid conflict.
*/
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException;
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.VersionedProtocol;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Implementation for protobuf service that forwards requests
* received on {@link JournalProtocolPB} to the
* {@link JournalProtocol} server implementation.
*/
public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB {
/** Server side implementation to delegate the requests to */
private final JournalProtocol impl;
public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) {
this.impl = impl;
}
/** @see JournalProtocol#journal */
@Override
public JournalResponseProto journal(RpcController unused,
JournalRequestProto req) throws ServiceException {
try {
impl.journal(PBHelper.convert(req.getRegistration()),
req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
.toByteArray());
} catch (IOException e) {
throw new ServiceException(e);
}
return JournalResponseProto.newBuilder().build();
}
/** @see JournalProtocol#startLogSegment */
@Override
public StartLogSegmentResponseProto startLogSegment(RpcController controller,
StartLogSegmentRequestProto req) throws ServiceException {
try {
impl.startLogSegment(PBHelper.convert(req.getRegistration()),
req.getTxid());
} catch (IOException e) {
throw new ServiceException(e);
}
return StartLogSegmentResponseProto.newBuilder().build();
}
/** @see VersionedProtocol#getProtocolVersion */
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return RPC.getProtocolVersion(JournalProtocolPB.class);
}
/**
* The client side will redirect getProtocolSignature to
* getProtocolSignature2.
*
* However the RPC layer below on the Server side will call getProtocolVersion
* and possibly in the future getProtocolSignature. Hence we still implement
* it even though the end client will never call this method.
*
* @see VersionedProtocol#getProtocolSignature(String, long, int)
*/
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link JournalProtocol}
*/
if (!protocol.equals(RPC.getProtocolName(JournalProtocolPB.class))) {
throw new IOException("Namenode Serverside implements " +
RPC.getProtocolName(JournalProtocolPB.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(JournalProtocolPB.class),
JournalProtocolPB.class);
}
@Override
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link JournalPBProtocol}
*/
return ProtocolSignatureWritable.convert(
this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client side translator to translate the requests made on
* {@link JournalProtocol} interfaces to the RPC server implementing
* {@link JournalProtocolPB}.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class JournalProtocolTranslatorPB implements JournalProtocol, Closeable {
/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
private final JournalProtocolPB rpcProxy;
public JournalProtocolTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(JournalProtocolPB.class,
JournalProtocol.versionID, nameNodeAddr, conf);
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
}
@Override
public long getProtocolVersion(String protocolName, long clientVersion)
throws IOException {
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2(
protocol, clientVersion, clientMethodsHash));
}
@Override
public void journal(NamenodeRegistration reg, long firstTxnId,
int numTxns, byte[] records) throws IOException {
JournalRequestProto req = JournalRequestProto.newBuilder()
.setRegistration(PBHelper.convert(reg))
.setFirstTxnId(firstTxnId)
.setNumTxns(numTxns)
.setRecords(PBHelper.getByteString(records))
.build();
try {
rpcProxy.journal(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void startLogSegment(NamenodeRegistration registration, long txid)
throws IOException {
StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setTxid(txid)
.build();
try {
rpcProxy.startLogSegment(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import com.google.protobuf.ByteString;
/**
* Utilities for converting protobuf classes to and from
* implementation classes.
*/
class PBHelper {
private PBHelper() {
/** Hidden constructor */
}
public static ByteString getByteString(byte[] bytes) {
return ByteString.copyFrom(bytes);
}
public static NamenodeRole convert(NamenodeRoleProto role) {
switch (role) {
case NAMENODE:
return NamenodeRole.NAMENODE;
case BACKUP:
return NamenodeRole.BACKUP;
case CHECKPOINT:
return NamenodeRole.CHECKPOINT;
}
return null;
}
public static NamenodeRoleProto convert(NamenodeRole role) {
switch (role) {
case NAMENODE:
return NamenodeRoleProto.NAMENODE;
case BACKUP:
return NamenodeRoleProto.BACKUP;
case CHECKPOINT:
return NamenodeRoleProto.CHECKPOINT;
}
return null;
}
public static StorageInfoProto convert(StorageInfo info) {
return StorageInfoProto.newBuilder().setClusterID(info.getClusterID())
.setCTime(info.getCTime())
.setLayoutVersion(info.getLayoutVersion())
.setNamespceID(info.getNamespaceID())
.build();
}
public static StorageInfo convert(StorageInfoProto info) {
return new StorageInfo(info.getLayoutVersion(), info.getNamespceID(),
info.getClusterID(), info.getCTime());
}
public static NamenodeRegistrationProto convert(NamenodeRegistration reg) {
return NamenodeRegistrationProto.newBuilder()
.setHttpAddress(reg.getHttpAddress())
.setRole(convert(reg.getRole()))
.setRpcAddress(reg.getAddress())
.setStorageInfo(convert((StorageInfo) reg)).build();
}
public static NamenodeRegistration convert(NamenodeRegistrationProto reg) {
return new NamenodeRegistration(reg.getRpcAddress(), reg.getHttpAddress(),
convert(reg.getStorageInfo()), convert(reg.getRole()));
}
}

View File

@ -0,0 +1,62 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<title>Protocol Buffers based data types for NN protocols</title>
</head>
<body>
<p>
The Protocol Buffers data types for NN protocols that use PB go in this package.
</p>
<h1>Steps to add a new protocol</h1>
<hr/>
<ol>
<li>Define the protobuf service for the protocol in &lt;ProtocolName&gt;.proto class.
<ul>
<li>This file should include both the protobuf service definition and the types
used for request and response. For example see - NamenodeProtocol.proto
<li>The naming convention for the protobuf service is &lt;ProtocolName&gt;Service.
Example: NamenodeProtocolService.
<li>Every RPC method takes a request and returns a response. The request
naming convention is &lt;MethodName&gt;RequestProto. The response naming convention
is &lt;MethodName&gt;ResponseProto.
</ul>
<li>Generate java files from the proto file using protoc tool.
<li>Define server side interface that extends BlockingInterface from the
generated files (Example: NamenodeProtocolService.BlockingInterface)
and VersionedProtocol. See NamenodePBProtocol.java for example.
<li>Define client side translator to translate the client protocol to
protobuf. See NamenodeProtocolTranslator.
<li>Define server side implementation that implements the server side interface.
This implementation receives the protobuf requests and delegates it to the
server side implementation. See NamenodePBProtocolImpl for example.
<li>Make changes to register this protocol at the server. See the other
protocols on how this is done.
</ol>
<h1>Steps to make changes to the existing protocol in a compatible way</h1>
<hr/>
<ol>
<li>Adding new methods is a compatible change.</li>
<li>When modifying an existing method, do not change the required parameters
to optional or optional parameters to required. Only add optional parameters
to the request and response.</li>
<li>When modifying an existing type, do not change the required parameters
to optional or optional parameters to require and optional parameters to
required. Only add optional parameters to the request and response.</li>
</ol>

View File

@ -1,29 +0,0 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<title>Protocol Buffers based data types for NN protocols</title>
</head>
<body>
<p>
The Protocol Buffers data types for NN protocols that use
PB go in this package.
</p>

View File

@ -103,7 +103,7 @@ public void close() throws IOException {
}
@Override
long length() throws IOException {
public long length() throws IOException {
// file size + size of both buffers
return inner.length();
}
@ -135,7 +135,7 @@ public long getLastTxId() throws IOException {
}
@Override
boolean isInProgress() {
public boolean isInProgress() {
return true;
}
}

View File

@ -67,12 +67,12 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
}
@Override // EditLogOutputStream
void write(FSEditLogOp op) throws IOException {
public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op);
}
@Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException {
public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
throw new IOException("Not supported");
}
@ -80,7 +80,7 @@ void writeRaw(byte[] bytes, int offset, int length) throws IOException {
* There is no persistent storage. Just clear the buffers.
*/
@Override // EditLogOutputStream
void create() throws IOException {
public void create() throws IOException {
assert doubleBuf.isFlushed() : "previous data is not flushed yet";
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
}
@ -106,7 +106,7 @@ public void abort() throws IOException {
}
@Override // EditLogOutputStream
void setReadyToFlush() throws IOException {
public void setReadyToFlush() throws IOException {
doubleBuf.setReadyToFlush();
}

View File

@ -129,13 +129,13 @@ public void close() throws IOException {
}
@Override
long length() throws IOException {
public long length() throws IOException {
// file size + size of both buffers
return file.length();
}
@Override
boolean isInProgress() {
public boolean isInProgress() {
return isInProgress;
}

View File

@ -73,7 +73,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
/** {@inheritDoc} */
@Override
void write(FSEditLogOp op) throws IOException {
public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op);
}
@ -86,7 +86,7 @@ void write(FSEditLogOp op) throws IOException {
* </ul>
* */
@Override
void writeRaw(byte[] bytes, int offset, int length) throws IOException {
public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
doubleBuf.writeRaw(bytes, offset, length);
}
@ -94,7 +94,7 @@ void writeRaw(byte[] bytes, int offset, int length) throws IOException {
* Create empty edits logs file.
*/
@Override
void create() throws IOException {
public void create() throws IOException {
fc.truncate(0);
fc.position(0);
doubleBuf.getCurrentBuf().writeInt(HdfsConstants.LAYOUT_VERSION);
@ -150,7 +150,7 @@ public void abort() throws IOException {
* data can be still written to the stream while flushing is performed.
*/
@Override
void setReadyToFlush() throws IOException {
public void setReadyToFlush() throws IOException {
doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
doubleBuf.setReadyToFlush();
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.Closeable;
import java.io.IOException;
@ -79,7 +81,7 @@ public abstract class EditLogInputStream implements JournalStream, Closeable {
/**
* Return the size of the current edits log.
*/
abstract long length() throws IOException;
public abstract long length() throws IOException;
/**
* Return true if this stream is in progress, false if it is finalized.

View File

@ -21,17 +21,21 @@
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A generic abstract class to support journaling of edits logs into
* a persistent storage.
*/
abstract class EditLogOutputStream {
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class EditLogOutputStream {
// these are statistics counters
private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync
EditLogOutputStream() {
public EditLogOutputStream() throws IOException {
numSync = totalTimeSync = 0;
}
@ -41,7 +45,7 @@ abstract class EditLogOutputStream {
* @param op operation
* @throws IOException
*/
abstract void write(FSEditLogOp op) throws IOException;
abstract public void write(FSEditLogOp op) throws IOException;
/**
* Write raw data to an edit log. This data should already have
@ -54,7 +58,7 @@ abstract class EditLogOutputStream {
* @param length number of bytes to write
* @throws IOException
*/
abstract void writeRaw(byte[] bytes, int offset, int length)
abstract public void writeRaw(byte[] bytes, int offset, int length)
throws IOException;
/**
@ -62,7 +66,7 @@ abstract void writeRaw(byte[] bytes, int offset, int length)
*
* @throws IOException
*/
abstract void create() throws IOException;
abstract public void create() throws IOException;
/**
* Close the journal.
@ -81,7 +85,7 @@ abstract void writeRaw(byte[] bytes, int offset, int length)
* All data that has been written to the stream so far will be flushed.
* New data can be still written to the stream while flushing is performed.
*/
abstract void setReadyToFlush() throws IOException;
abstract public void setReadyToFlush() throws IOException;
/**
* Flush and sync all data that is ready to be flush

View File

@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.lang.reflect.Constructor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,6 +32,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
@ -122,6 +124,7 @@ private enum State {
private NameNodeMetrics metrics;
private NNStorage storage;
private Configuration conf;
private static class TransactionId {
public long txid;
@ -163,6 +166,7 @@ protected synchronized TransactionId initialValue() {
* @param editsDirs List of journals to use
*/
FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
this.conf = conf;
isSyncRunning = false;
this.storage = storage;
metrics = NameNode.getNameNodeMetrics();
@ -210,9 +214,13 @@ public void initSharedJournalsForRead() {
private void initJournals(Collection<URI> dirs) {
this.journalSet = new JournalSet();
for (URI u : dirs) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
journalSet.add(new FileJournalManager(sd));
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
journalSet.add(new FileJournalManager(sd));
}
} else {
journalSet.add(createJournal(u));
}
}
@ -1053,4 +1061,53 @@ static void closeAllStreams(Iterable<EditLogInputStream> streams) {
IOUtils.closeStream(s);
}
}
/**
* Retrieve the implementation class for a Journal scheme.
* @param conf The configuration to retrieve the information from
* @param uriScheme The uri scheme to look up.
* @return the class of the journal implementation
* @throws IllegalArgumentException if no class is configured for uri
*/
static Class<? extends JournalManager> getJournalClass(Configuration conf,
String uriScheme) {
String key
= DFSConfigKeys.DFS_NAMENODE_EDITS_PLUGIN_PREFIX + "." + uriScheme;
Class <? extends JournalManager> clazz = null;
try {
clazz = conf.getClass(key, null, JournalManager.class);
} catch (RuntimeException re) {
throw new IllegalArgumentException(
"Invalid class specified for " + uriScheme, re);
}
if (clazz == null) {
LOG.warn("No class configured for " +uriScheme
+ ", " + key + " is empty");
throw new IllegalArgumentException(
"No class configured for " + uriScheme);
}
return clazz;
}
/**
* Construct a custom journal manager.
* The class to construct is taken from the configuration.
* @param uri Uri to construct
* @return The constructed journal manager
* @throws IllegalArgumentException if no class is configured for uri
*/
private JournalManager createJournal(URI uri) {
Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme());
try {
Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class);
return cons.newInstance(conf, uri);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e);
}
}
}

View File

@ -25,6 +25,8 @@
import java.util.Arrays;
import java.util.EnumMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -57,6 +59,8 @@
import org.apache.hadoop.hdfs.util.Holder;
import com.google.common.base.Joiner;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLogLoader {
private final FSNamesystem fsNamesys;
@ -514,7 +518,7 @@ long getNumTransactions() {
/**
* Stream wrapper that keeps track of the current stream position.
*/
static class PositionTrackingInputStream extends FilterInputStream {
public static class PositionTrackingInputStream extends FilterInputStream {
private long curPos = 0;
private long markPos = -1;

View File

@ -113,6 +113,10 @@ private FSEditLogOp(FSEditLogOpCodes opCode) {
this.txid = 0;
}
public long getTransactionId() {
return txid;
}
public void setTransactionId(long txid) {
this.txid = txid;
}

View File

@ -20,6 +20,8 @@
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A JournalManager is responsible for managing a single place of storing
@ -28,7 +30,9 @@
* each conceptual place of storage corresponds to exactly one instance of
* this class, which is created when the EditLog is first opened.
*/
interface JournalManager extends Closeable {
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface JournalManager extends Closeable {
/**
* Begin writing to a new segment of the log stream, which starts at
* the given transaction ID.
@ -71,7 +75,6 @@ long getNumberOfTransactions(long fromTxnId)
*
* @param minTxIdToKeep the earliest txid that must be retained after purging
* old logs
* @param purger the purging implementation to use
* @throws IOException if purging fails
*/
void purgeLogsOlderThan(long minTxIdToKeep)

View File

@ -309,7 +309,7 @@ private class JournalSetOutputStream extends EditLogOutputStream {
}
@Override
void write(final FSEditLogOp op)
public void write(final FSEditLogOp op)
throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
@ -322,7 +322,7 @@ public void apply(JournalAndStream jas) throws IOException {
}
@Override
void writeRaw(final byte[] data, final int offset, final int length)
public void writeRaw(final byte[] data, final int offset, final int length)
throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
@ -335,7 +335,7 @@ public void apply(JournalAndStream jas) throws IOException {
}
@Override
void create() throws IOException {
public void create() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
@ -367,7 +367,7 @@ public void apply(JournalAndStream jas) throws IOException {
}
@Override
void setReadyToFlush() throws IOException {
public void setReadyToFlush() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {

View File

@ -71,7 +71,8 @@ public class NNStorage extends Storage implements Closeable {
private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
static final String LOCAL_URI_SCHEME = "file";
//
// The filenames used for storing the images
//
@ -338,22 +339,14 @@ StorageDirectory getStorageDirectory(URI uri) {
/**
* Checks the consistency of a URI, in particular if the scheme
* is specified and is supported by a concrete implementation
* is specified
* @param u URI whose consistency is being checked.
*/
private static void checkSchemeConsistency(URI u) throws IOException {
String scheme = u.getScheme();
// the URI should have a proper scheme
if(scheme == null)
if(scheme == null) {
throw new IOException("Undefined scheme for " + u);
else {
try {
// the scheme should be enumerated as JournalType
JournalType.valueOf(scheme.toUpperCase());
} catch (IllegalArgumentException iae){
throw new IOException("Unknown scheme " + scheme +
". It should correspond to a JournalType enumeration value");
}
}
}

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.hdfs.server.common.Util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Collections2;
import com.google.common.base.Predicate;
/**
*
@ -69,7 +71,18 @@ public NameNodeResourceChecker(Configuration conf) throws IOException {
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
addDirsToCheck(FSNamesystem.getNamespaceDirs(conf));
addDirsToCheck(FSNamesystem.getNamespaceEditsDirs(conf));
Collection<URI> localEditDirs = Collections2.filter(
FSNamesystem.getNamespaceEditsDirs(conf),
new Predicate<URI>() {
public boolean apply(URI input) {
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
return true;
}
return false;
}
});
addDirsToCheck(localEditDirs);
addDirsToCheck(extraCheckedVolumes);
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import static junit.framework.Assert.*;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.junit.Test;
/**
* Tests for {@link PBHelper}
*/
public class TestPBHelper {
@Test
public void testConvertNamenodeRole() {
assertEquals(NamenodeRoleProto.BACKUP,
PBHelper.convert(NamenodeRole.BACKUP));
assertEquals(NamenodeRoleProto.CHECKPOINT,
PBHelper.convert(NamenodeRole.CHECKPOINT));
assertEquals(NamenodeRoleProto.NAMENODE,
PBHelper.convert(NamenodeRole.NAMENODE));
assertEquals(NamenodeRole.BACKUP,
PBHelper.convert(NamenodeRoleProto.BACKUP));
assertEquals(NamenodeRole.CHECKPOINT,
PBHelper.convert(NamenodeRoleProto.CHECKPOINT));
assertEquals(NamenodeRole.NAMENODE,
PBHelper.convert(NamenodeRoleProto.NAMENODE));
}
@Test
public void testConvertStoragInfo() {
StorageInfo info = new StorageInfo(1, 2, "cid", 3);
StorageInfoProto infoProto = PBHelper.convert(info);
StorageInfo info2 = PBHelper.convert(infoProto);
assertEquals(info.getClusterID(), info2.getClusterID());
assertEquals(info.getCTime(), info2.getCTime());
assertEquals(info.getLayoutVersion(), info2.getLayoutVersion());
assertEquals(info.getNamespaceID(), info2.getNamespaceID());
}
@Test
public void testConvertNamenodeRegistration() {
StorageInfo info = new StorageInfo(1, 2, "cid", 3);
NamenodeRegistration reg = new NamenodeRegistration("address:999",
"http:1000", info, NamenodeRole.NAMENODE);
NamenodeRegistrationProto regProto = PBHelper.convert(reg);
NamenodeRegistration reg2 = PBHelper.convert(regProto);
assertEquals(reg.getAddress(), reg2.getAddress());
assertEquals(reg.getClusterID(), reg2.getClusterID());
assertEquals(reg.getCTime(), reg2.getCTime());
assertEquals(reg.getHttpAddress(), reg2.getHttpAddress());
assertEquals(reg.getLayoutVersion(), reg2.getLayoutVersion());
assertEquals(reg.getNamespaceID(), reg2.getNamespaceID());
assertEquals(reg.getRegistrationID(), reg2.getRegistrationID());
assertEquals(reg.getRole(), reg2.getRole());
assertEquals(reg.getVersion(), reg2.getVersion());
}
}

View File

@ -773,7 +773,7 @@ public JournalType getType() {
}
@Override
boolean isInProgress() {
public boolean isInProgress() {
return true;
}
}