Merge trunk into HDFS-6581

This commit is contained in:
arp 2014-09-02 16:25:32 -07:00
commit a6b32a3e78
62 changed files with 2968 additions and 154 deletions

1
.gitignore vendored
View File

@ -9,6 +9,7 @@
.project
.settings
target
build
hadoop-common-project/hadoop-kms/downloads/
hadoop-hdfs-project/hadoop-hdfs/downloads
hadoop-hdfs-project/hadoop-hdfs-httpfs/downloads

View File

@ -127,6 +127,10 @@ Trunk (Unreleased)
HADOOP-11013. CLASSPATH handling should be consolidated, debuggable (aw)
HADOOP-11041. VersionInfo specifies subversion (Tsuyoshi OZAWA via aw)
HADOOP-10373 create tools/hadoop-amazon for aws/EMR support (stevel)
BUG FIXES
HADOOP-9451. Fault single-layer config if node group topology is enabled.
@ -324,6 +328,8 @@ Trunk (Unreleased)
HADOOP-10748. HttpServer2 should not load JspServlet. (wheat9)
HADOOP-11033. shell scripts ignore JAVA_HOME on OS X. (aw)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -485,6 +491,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-11030. Define a variable jackson.version instead of using constant
at multiple places. (Juan Yu via kasha)
HADOOP-10990. Add missed NFSv3 request and response classes (brandonli)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@ -727,6 +735,11 @@ Release 2.6.0 - UNRELEASED
HADOOP-10911. hadoop.auth cookie after HADOOP-10710 still not proper
according to RFC2109. (gchanan via tucu)
HADOOP-11036. Add build directory to .gitignore (Tsuyoshi OZAWA via aw)
HADOOP-11012. hadoop fs -text of zero-length file causes EOFException
(Eric Payne via jlowe)
Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -517,10 +517,12 @@ function hadoop_os_tricks
# examples for OS X and Linux. Vendors, replace this with your special sauce.
case ${HADOOP_OS_TYPE} in
Darwin)
if [[ -x /usr/libexec/java_home ]]; then
export JAVA_HOME="$(/usr/libexec/java_home)"
else
export JAVA_HOME=/Library/Java/Home
if [[ -z "${JAVA_HOME}" ]]; then
if [[ -x /usr/libexec/java_home ]]; then
export JAVA_HOME="$(/usr/libexec/java_home)"
else
export JAVA_HOME=/Library/Java/Home
fi
fi
;;
Linux)

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.PerformanceAdvisory;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,7 +49,7 @@ public abstract class CryptoCodec implements Configurable {
*
* @param conf
* the configuration
* @param CipherSuite
* @param cipherSuite
* algorithm/mode/padding
* @return CryptoCodec the codec object. Null value will be returned if no
* crypto codec classes with cipher suite configured.
@ -66,15 +67,18 @@ public static CryptoCodec getInstance(Configuration conf,
CryptoCodec c = ReflectionUtils.newInstance(klass, conf);
if (c.getCipherSuite().getName().equals(cipherSuite.getName())) {
if (codec == null) {
LOG.debug("Using crypto codec {}.", klass.getName());
PerformanceAdvisory.LOG.debug("Using crypto codec {}.",
klass.getName());
codec = c;
}
} else {
LOG.warn("Crypto codec {} doesn't meet the cipher suite {}.",
PerformanceAdvisory.LOG.debug(
"Crypto codec {} doesn't meet the cipher suite {}.",
klass.getName(), cipherSuite.getName());
}
} catch (Exception e) {
LOG.warn("Crypto codec {} is not available.", klass.getName());
PerformanceAdvisory.LOG.debug("Crypto codec {} is not available.",
klass.getName());
}
}
@ -108,7 +112,8 @@ private static List<Class<? extends CryptoCodec>> getCodecClasses(
cipherSuite.getConfigSuffix();
String codecString = conf.get(configName);
if (codecString == null) {
LOG.warn("No crypto codec classes with cipher suite configured.");
PerformanceAdvisory.LOG.debug(
"No crypto codec classes with cipher suite configured.");
return null;
}
for (String c : Splitter.on(',').trimResults().omitEmptyStrings().
@ -117,9 +122,9 @@ private static List<Class<? extends CryptoCodec>> getCodecClasses(
Class<?> cls = conf.getClassByName(c);
result.add(cls.asSubclass(CryptoCodec.class));
} catch (ClassCastException e) {
LOG.warn("Class " + c + " is not a CryptoCodec.");
PerformanceAdvisory.LOG.debug("Class {} is not a CryptoCodec.", c);
} catch (ClassNotFoundException e) {
LOG.warn("Crypto codec " + c + " not found.");
PerformanceAdvisory.LOG.debug("Crypto codec {} not found.", c);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.shell;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
@ -126,8 +127,17 @@ public static class Text extends Cat {
protected InputStream getInputStream(PathData item) throws IOException {
FSDataInputStream i = (FSDataInputStream)super.getInputStream(item);
// Handle 0 and 1-byte files
short leadBytes;
try {
leadBytes = i.readShort();
} catch (EOFException e) {
i.seek(0);
return i;
}
// Check type of stream first
switch(i.readShort()) {
switch(leadBytes) {
case 0x1f8b: { // RFC 1952
// Must be gzip
i.seek(0);

View File

@ -16,9 +16,10 @@
*/
package org.apache.hadoop.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PerformanceAdvisory {
public static final Log LOG = LogFactory.getLog(PerformanceAdvisory.class);
public static final Logger LOG =
LoggerFactory.getLogger(PerformanceAdvisory.class);
}

View File

@ -170,7 +170,8 @@ public static String getProtocVersion(){
public static void main(String[] args) {
LOG.debug("version: "+ getVersion());
System.out.println("Hadoop " + getVersion());
System.out.println("Subversion " + getUrl() + " -r " + getRevision());
System.out.println("Source code repository " + getUrl() + " -r " +
getRevision());
System.out.println("Compiled by " + getUser() + " on " + getDate());
System.out.println("Compiled with protoc " + getProtocVersion());
System.out.println("From source with checksum " + getSrcChecksum());

View File

@ -1,10 +1,11 @@
Package: libhadoop
Authors: Arun C Murthy <arunc@yahoo-inc.com>
MOTIVATION
The libhadoop package contains the native code for any of hadoop (http://hadoop.apache.org/core).
The libhadoop package contains the native code for Apache Hadoop (http://hadoop.apache.org/).
IMPROVEMENTS
Any suggestions for improvements or patched should be sent to core-dev@hadoop.apache.org. Please go through http://wiki.apache.org/hadoop/HowToContribute for more information on how to contribute.
Any suggestions for improvements or patched should be sent to common-dev@hadoop.apache.org.
Please see http://wiki.apache.org/hadoop/HowToContribute for more information on how to contribute.

View File

@ -42,29 +42,14 @@ public class TestTextCommand {
System.getProperty("test.build.data", "build/test/data/") + "/testText";
private static final String AVRO_FILENAME =
new Path(TEST_ROOT_DIR, "weather.avro").toUri().getPath();
private static final String TEXT_FILENAME =
new Path(TEST_ROOT_DIR, "testtextfile.txt").toUri().getPath();
/**
* Tests whether binary Avro data files are displayed correctly.
*/
@Test (timeout = 30000)
public void testDisplayForAvroFiles() throws Exception {
// Create a small Avro data file on the local file system.
createAvroFile(generateWeatherAvroBinaryData());
// Prepare and call the Text command's protected getInputStream method
// using reflection.
Configuration conf = new Configuration();
URI localPath = new URI(AVRO_FILENAME);
PathData pathData = new PathData(localPath, conf);
Display.Text text = new Display.Text();
text.setConf(conf);
Method method = text.getClass().getDeclaredMethod(
"getInputStream", PathData.class);
method.setAccessible(true);
InputStream stream = (InputStream) method.invoke(text, pathData);
String output = inputStreamToString(stream);
// Check the output.
String expectedOutput =
"{\"station\":\"011990-99999\",\"time\":-619524000000,\"temp\":0}" +
System.getProperty("line.separator") +
@ -77,18 +62,72 @@ public void testDisplayForAvroFiles() throws Exception {
"{\"station\":\"012650-99999\",\"time\":-655509600000,\"temp\":78}" +
System.getProperty("line.separator");
String output = readUsingTextCommand(AVRO_FILENAME,
generateWeatherAvroBinaryData());
assertEquals(expectedOutput, output);
}
/**
* Tests that a zero-length file is displayed correctly.
*/
@Test (timeout = 30000)
public void testEmptyTextFil() throws Exception {
byte[] emptyContents = { };
String output = readUsingTextCommand(TEXT_FILENAME, emptyContents);
assertTrue("".equals(output));
}
/**
* Tests that a one-byte file is displayed correctly.
*/
@Test (timeout = 30000)
public void testOneByteTextFil() throws Exception {
byte[] oneByteContents = { 'x' };
String output = readUsingTextCommand(TEXT_FILENAME, oneByteContents);
assertTrue(new String(oneByteContents).equals(output));
}
/**
* Tests that a one-byte file is displayed correctly.
*/
@Test (timeout = 30000)
public void testTwoByteTextFil() throws Exception {
byte[] twoByteContents = { 'x', 'y' };
String output = readUsingTextCommand(TEXT_FILENAME, twoByteContents);
assertTrue(new String(twoByteContents).equals(output));
}
// Create a file on the local file system and read it using
// the Display.Text class.
private String readUsingTextCommand(String fileName, byte[] fileContents)
throws Exception {
createFile(fileName, fileContents);
// Prepare and call the Text command's protected getInputStream method
// using reflection.
Configuration conf = new Configuration();
URI localPath = new URI(fileName);
PathData pathData = new PathData(localPath, conf);
Display.Text text = new Display.Text() {
@Override
public InputStream getInputStream(PathData item) throws IOException {
return super.getInputStream(item);
}
};
text.setConf(conf);
InputStream stream = (InputStream) text.getInputStream(pathData);
return inputStreamToString(stream);
}
private String inputStreamToString(InputStream stream) throws IOException {
StringWriter writer = new StringWriter();
IOUtils.copy(stream, writer);
return writer.toString();
}
private void createAvroFile(byte[] contents) throws IOException {
private void createFile(String fileName, byte[] contents) throws IOException {
(new File(TEST_ROOT_DIR)).mkdir();
File file = new File(AVRO_FILENAME);
File file = new File(fileName);
file.createNewFile();
FileOutputStream stream = new FileOutputStream(file);
stream.write(contents);

View File

@ -53,9 +53,19 @@ public class Nfs3FileAttributes {
* For Hadoop, currently this field is always zero.
*/
public static class Specdata3 {
final static int specdata1 = 0;
final static int specdata2 = 0;
final int specdata1;
final int specdata2;
public Specdata3() {
specdata1 = 0;
specdata2 = 0;
}
public Specdata3(int specdata1, int specdata2) {
this.specdata1 = specdata1;
this.specdata2 = specdata2;
}
public int getSpecdata1() {
return specdata1;
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.nfs.nfs3.request;
import java.io.IOException;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.oncrpc.XDR;
/**
* LINK3 Request
*/
public class LINK3Request extends RequestWithHandle {
private final FileHandle fromDirHandle;
private final String fromName;
public LINK3Request(FileHandle handle, FileHandle fromDirHandle,
String fromName) {
super(handle);
this.fromDirHandle = fromDirHandle;
this.fromName = fromName;
}
public static LINK3Request deserialize(XDR xdr) throws IOException {
FileHandle handle = readHandle(xdr);
FileHandle fromDirHandle = readHandle(xdr);
String fromName = xdr.readString();
return new LINK3Request(handle, fromDirHandle, fromName);
}
public FileHandle getFromDirHandle() {
return fromDirHandle;
}
public String getFromName() {
return fromName;
}
@Override
public void serialize(XDR xdr) {
handle.serialize(xdr);
fromDirHandle.serialize(xdr);
xdr.writeInt(fromName.length());
xdr.writeFixedOpaque(fromName.getBytes(), fromName.length());
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.nfs.nfs3.request;
import java.io.IOException;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes.Specdata3;
import org.apache.hadoop.oncrpc.XDR;
/**
* MKNOD3 Request
*/
public class MKNOD3Request extends RequestWithHandle {
private final String name;
private int type;
private SetAttr3 objAttr = null;
private Specdata3 spec = null;
public MKNOD3Request(FileHandle handle, String name, int type,
SetAttr3 objAttr, Specdata3 spec) {
super(handle);
this.name = name;
this.type = type;
this.objAttr = objAttr;
this.spec = spec;
}
public static MKNOD3Request deserialize(XDR xdr) throws IOException {
FileHandle handle = readHandle(xdr);
String name = xdr.readString();
int type = xdr.readInt();
SetAttr3 objAttr = new SetAttr3();
Specdata3 spec = null;
if (type == NfsFileType.NFSCHR.toValue()
|| type == NfsFileType.NFSBLK.toValue()) {
objAttr.deserialize(xdr);
spec = new Specdata3(xdr.readInt(), xdr.readInt());
} else if (type == NfsFileType.NFSSOCK.toValue()
|| type == NfsFileType.NFSFIFO.toValue()) {
objAttr.deserialize(xdr);
}
return new MKNOD3Request(handle, name, type, objAttr, spec);
}
public String getName() {
return name;
}
public int getType() {
return type;
}
public SetAttr3 getObjAttr() {
return objAttr;
}
public Specdata3 getSpec() {
return spec;
}
@Override
public void serialize(XDR xdr) {
handle.serialize(xdr);
xdr.writeInt(name.length());
xdr.writeFixedOpaque(name.getBytes(), name.length());
objAttr.serialize(xdr);
if (spec != null) {
xdr.writeInt(spec.getSpecdata1());
xdr.writeInt(spec.getSpecdata2());
}
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.nfs.nfs3.response;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.Verifier;
public class LINK3Response extends NFS3Response {
private final WccData fromDirWcc;
private final WccData linkDirWcc;
public LINK3Response(int status) {
this(status, new WccData(null, null), new WccData(null, null));
}
public LINK3Response(int status, WccData fromDirWcc,
WccData linkDirWcc) {
super(status);
this.fromDirWcc = fromDirWcc;
this.linkDirWcc = linkDirWcc;
}
public WccData getFromDirWcc() {
return fromDirWcc;
}
public WccData getLinkDirWcc() {
return linkDirWcc;
}
@Override
public XDR writeHeaderAndResponse(XDR out, int xid, Verifier verifier) {
super.writeHeaderAndResponse(out, xid, verifier);
fromDirWcc.serialize(out);
linkDirWcc.serialize(out);
return out;
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.nfs.nfs3.response;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.Verifier;
public class MKNOD3Response extends NFS3Response {
private final FileHandle objFileHandle;
private final Nfs3FileAttributes objPostOpAttr;
private final WccData dirWcc;
public MKNOD3Response(int status) {
this(status, null, null, new WccData(null, null));
}
public MKNOD3Response(int status, FileHandle handle,
Nfs3FileAttributes attrs, WccData dirWcc) {
super(status);
this.objFileHandle = handle;
this.objPostOpAttr = attrs;
this.dirWcc = dirWcc;
}
public FileHandle getObjFileHandle() {
return objFileHandle;
}
public Nfs3FileAttributes getObjPostOpAttr() {
return objPostOpAttr;
}
public WccData getDirWcc() {
return dirWcc;
}
@Override
public XDR writeHeaderAndResponse(XDR out, int xid, Verifier verifier) {
super.writeHeaderAndResponse(out, xid, verifier);
if (this.getStatus() == Nfs3Status.NFS3_OK) {
out.writeBoolean(true);
objFileHandle.serialize(out);
out.writeBoolean(true);
objPostOpAttr.serialize(out);
}
dirWcc.serialize(out);
return out;
}
}

View File

@ -421,7 +421,7 @@ public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
if (existantWriteCtx != null) {
if (!existantWriteCtx.getReplied()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Repeated write request which hasn't be served: xid="
LOG.debug("Repeated write request which hasn't been served: xid="
+ xid + ", drop it.");
}
} else {
@ -579,7 +579,7 @@ private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
* writing, and there is no other threads writing (i.e., asyncStatus is
* false), start the writing and set asyncStatus to true.
*
* @return True if the new write is sequencial and we can start writing
* @return True if the new write is sequential and we can start writing
* (including the case that there is already a thread writing).
*/
private synchronized boolean checkAndStartWrite(
@ -898,7 +898,7 @@ private synchronized WriteCtx offerNextToWrite() {
long offset = nextOffset.get();
if (range.getMin() > offset) {
if (LOG.isDebugEnabled()) {
LOG.debug("The next sequencial write has not arrived yet");
LOG.debug("The next sequential write has not arrived yet");
}
processCommits(nextOffset.get()); // handle race
this.asyncStatus = false;

View File

@ -1423,7 +1423,7 @@ private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath,
throw io;
}
// This happens when startAfter was just deleted
LOG.info("Cookie cound't be found: " + new String(startAfter)
LOG.info("Cookie couldn't be found: " + new String(startAfter)
+ ", do listing from beginning");
dlisting = dfsClient
.listPaths(dirFileIdPath, HdfsFileStatus.EMPTY_NAME);

View File

@ -430,6 +430,11 @@ Release 2.6.0 - UNRELEASED
HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu
via atm)
HDFS-6634. inotify in HDFS. (James Thomas via wang)
HDFS-4257. The ReplaceDatanodeOnFailure policies could have a forgiving
option (szetszwo via cmccabe)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -580,6 +585,8 @@ Release 2.6.0 - UNRELEASED
HDFS-6972. TestRefreshUserMappings.testRefreshSuperUserGroupsConfiguration
doesn't decode url correctly. (Yongjun Zhang via wang)
HDFS-6942. Fix typos in log messages. (Ray Chiang via wheat9)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an
@ -670,7 +677,8 @@ Release 2.6.0 - UNRELEASED
HDFS-6817. Fix findbugs and other warnings. (yliu)
HDFS-6839. Fix TestCLI to expect new output. (clamb)
--
HDFS-6954. With crypto, no native lib systems are too verbose. (clamb via wang)
Release 2.5.1 - UNRELEASED

View File

@ -106,6 +106,15 @@
<Field name="metrics" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
We use a separate lock to protect modifications to journalSet so that
FSEditLog#selectInputStreams does not need to be a synchronized method.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
<Field name="journalSet" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!--
This method isn't performance-critical and is much clearer to write as it's written.
-->

View File

@ -309,6 +309,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>fsimage.proto</include>
<include>hdfs.proto</include>
<include>encryption.proto</include>
<include>inotify.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>

View File

@ -168,6 +168,11 @@ public void setMaxOpSize(int maxOpSize) {
reader.setMaxOpSize(maxOpSize);
}
@Override
public boolean isLocalLog() {
return false;
}
/**
* Input stream implementation which can be used by
* FSEditLogOp.Reader

View File

@ -606,10 +606,12 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
cipherSuites.add(codec.getCipherSuite());
}
provider = DFSUtil.createKeyProviderCryptoExtension(conf);
if (provider == null) {
LOG.info("No KeyProvider found.");
} else {
LOG.info("Found KeyProvider: " + provider.toString());
if (LOG.isDebugEnabled()) {
if (provider == null) {
LOG.debug("No KeyProvider found.");
} else {
LOG.debug("Found KeyProvider: " + provider.toString());
}
}
int numResponseToDrop = conf.getInt(
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
@ -2990,6 +2992,15 @@ public void checkAccess(String src, FsAction mode) throws IOException {
}
}
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
return new DFSInotifyEventInputStream(namenode);
}
public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
throws IOException {
return new DFSInotifyEventInputStream(namenode, lastReadTxid);
}
@Override // RemotePeerFactory
public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)

View File

@ -53,6 +53,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY = "dfs.client.block.write.replace-datanode-on-failure.best-effort";
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT = false;
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
@ -678,4 +680,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY =
"dfs.datanode.block.id.layout.upgrade.threads";
public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12;
public static final String DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY =
"dfs.namenode.inotify.max.events.per.rpc";
public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT =
1000;
}

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Stream for reading inotify events. DFSInotifyEventInputStreams should not
* be shared among multiple threads.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class DFSInotifyEventInputStream {
public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
.class);
private final ClientProtocol namenode;
private Iterator<Event> it;
private long lastReadTxid;
/**
* The most recent txid the NameNode told us it has sync'ed -- helps us
* determine how far behind we are in the edit stream.
*/
private long syncTxid;
/**
* Used to generate wait times in {@link DFSInotifyEventInputStream#take()}.
*/
private Random rng = new Random();
private static final int INITIAL_WAIT_MS = 10;
DFSInotifyEventInputStream(ClientProtocol namenode) throws IOException {
this(namenode, namenode.getCurrentEditLogTxid()); // only consider new txn's
}
DFSInotifyEventInputStream(ClientProtocol namenode, long lastReadTxid)
throws IOException {
this.namenode = namenode;
this.it = Iterators.emptyIterator();
this.lastReadTxid = lastReadTxid;
}
/**
* Returns the next event in the stream or null if no new events are currently
* available.
*
* @throws IOException because of network error or edit log
* corruption. Also possible if JournalNodes are unresponsive in the
* QJM setting (even one unresponsive JournalNode is enough in rare cases),
* so catching this exception and retrying at least a few times is
* recommended.
* @throws MissingEventsException if we cannot return the next event in the
* stream because the data for the event (and possibly some subsequent events)
* has been deleted (generally because this stream is a very large number of
* events behind the current state of the NameNode). It is safe to continue
* reading from the stream after this exception is thrown -- the next
* available event will be returned.
*/
public Event poll() throws IOException, MissingEventsException {
// need to keep retrying until the NN sends us the latest committed txid
if (lastReadTxid == -1) {
LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
lastReadTxid = namenode.getCurrentEditLogTxid();
return null;
}
if (!it.hasNext()) {
EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits
syncTxid = el.getSyncTxid();
it = el.getEvents().iterator();
long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) {
throw new MissingEventsException(formerLastReadTxid + 1,
el.getFirstTxid());
}
} else {
LOG.debug("poll(): read no edits from the NN when requesting edits " +
"after txid {}", lastReadTxid);
return null;
}
}
if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
// newly seen edit log ops actually got converted to events
return it.next();
} else {
return null;
}
}
/**
* Return a estimate of how many events behind the NameNode's current state
* this stream is. Clients should periodically call this method and check if
* its result is steadily increasing, which indicates that they are falling
* behind (i.e. events are being generated faster than the client is reading
* them). If a client falls too far behind events may be deleted before the
* client can read them.
* <p/>
* A return value of -1 indicates that an estimate could not be produced, and
* should be ignored. The value returned by this method is really only useful
* when compared to previous or subsequent returned values.
*/
public long getEventsBehindEstimate() {
if (syncTxid == 0) {
return -1;
} else {
assert syncTxid >= lastReadTxid;
// this gives the difference between the last txid we have fetched to the
// client and syncTxid at the time we last fetched events from the
// NameNode
return syncTxid - lastReadTxid;
}
}
/**
* Returns the next event in the stream, waiting up to the specified amount of
* time for a new event. Returns null if a new event is not available at the
* end of the specified amount of time. The time before the method returns may
* exceed the specified amount of time by up to the time required for an RPC
* to the NameNode.
*
* @param time number of units of the given TimeUnit to wait
* @param tu the desired TimeUnit
* @throws IOException see {@link DFSInotifyEventInputStream#poll()}
* @throws MissingEventsException
* see {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted
*/
public Event poll(long time, TimeUnit tu) throws IOException,
InterruptedException, MissingEventsException {
long initialTime = Time.monotonicNow();
long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
long nextWait = INITIAL_WAIT_MS;
Event next = null;
while ((next = poll()) == null) {
long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
if (timeLeft <= 0) {
LOG.debug("timed poll(): timed out");
break;
} else if (timeLeft < nextWait * 2) {
nextWait = timeLeft;
} else {
nextWait *= 2;
}
LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
nextWait);
Thread.sleep(nextWait);
}
return next;
}
/**
* Returns the next event in the stream, waiting indefinitely if a new event
* is not immediately available.
*
* @throws IOException see {@link DFSInotifyEventInputStream#poll()}
* @throws MissingEventsException see
* {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted
*/
public Event take() throws IOException, InterruptedException,
MissingEventsException {
Event next = null;
int nextWaitMin = INITIAL_WAIT_MS;
while ((next = poll()) == null) {
// sleep for a random period between nextWaitMin and nextWaitMin * 2
// to avoid stampedes at the NN if there are multiple clients
int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
Thread.sleep(sleepTime);
// the maximum sleep is 2 minutes
nextWaitMin = Math.min(60000, nextWaitMin * 2);
}
return next;
}
}

View File

@ -1181,7 +1181,17 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
// Check if replace-datanode policy is satisfied.
if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
nodes, isAppend, isHflushed)) {
addDatanode2ExistingPipeline();
try {
addDatanode2ExistingPipeline();
} catch(IOException ioe) {
if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
throw ioe;
}
DFSClient.LOG.warn("Failed to replace datanode."
+ " Continue with the remaining datanodes since "
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
+ " is set to true.", ioe);
}
}
// get a new generation stamp and an access token

View File

@ -1940,4 +1940,13 @@ public Void next(final FileSystem fs, final Path p)
}
}.resolve(this, absF);
}
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
return dfs.getInotifyEventStream();
}
public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
throws IOException {
return dfs.getInotifyEventStream(lastReadTxid);
}
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -275,4 +276,53 @@ public RemoteIterator<EncryptionZone> listEncryptionZones()
throws IOException {
return dfs.listEncryptionZones();
}
/**
* Exposes a stream of namesystem events. Only events occurring after the
* stream is created are available.
* See {@link org.apache.hadoop.hdfs.DFSInotifyEventInputStream}
* for information on stream usage.
* See {@link org.apache.hadoop.hdfs.inotify.Event}
* for information on the available events.
* <p/>
* Inotify users may want to tune the following HDFS parameters to
* ensure that enough extra HDFS edits are saved to support inotify clients
* that fall behind the current state of the namespace while reading events.
* The default parameter values should generally be reasonable. If edits are
* deleted before their corresponding events can be read, clients will see a
* {@link org.apache.hadoop.hdfs.inotify.MissingEventsException} on
* {@link org.apache.hadoop.hdfs.DFSInotifyEventInputStream} method calls.
*
* It should generally be sufficient to tune these parameters:
* dfs.namenode.num.extra.edits.retained
* dfs.namenode.max.extra.edits.segments.retained
*
* Parameters that affect the number of created segments and the number of
* edits that are considered necessary, i.e. do not count towards the
* dfs.namenode.num.extra.edits.retained quota):
* dfs.namenode.checkpoint.period
* dfs.namenode.checkpoint.txns
* dfs.namenode.num.checkpoints.retained
* dfs.ha.log-roll.period
* <p/>
* It is recommended that local journaling be configured
* (dfs.namenode.edits.dir) for inotify (in addition to a shared journal)
* so that edit transfers from the shared journal can be avoided.
*
* @throws IOException If there was an error obtaining the stream.
*/
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
return dfs.getInotifyEventStream();
}
/**
* A version of {@link HdfsAdmin#getInotifyEventStream()} meant for advanced
* users who are aware of HDFS edits up to lastReadTxid (e.g. because they
* have access to an FSImage inclusive of lastReadTxid) and only want to read
* events after this point.
*/
public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
throws IOException {
return dfs.getInotifyEventStream(lastReadTxid);
}
}

View File

@ -0,0 +1,452 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.inotify;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import java.util.List;
/**
* Events sent by the inotify system. Note that no events are necessarily sent
* when a file is opened for read (although a MetadataUpdateEvent will be sent
* if the atime is updated).
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class Event {
public static enum EventType {
CREATE, CLOSE, APPEND, RENAME, METADATA, UNLINK
}
private EventType eventType;
public EventType getEventType() {
return eventType;
}
public Event(EventType eventType) {
this.eventType = eventType;
}
/**
* Sent when a file is closed after append or create.
*/
public static class CloseEvent extends Event {
private String path;
private long fileSize;
private long timestamp;
public CloseEvent(String path, long fileSize, long timestamp) {
super(EventType.CLOSE);
this.path = path;
this.fileSize = fileSize;
this.timestamp = timestamp;
}
public String getPath() {
return path;
}
/**
* The size of the closed file in bytes. May be -1 if the size is not
* available (e.g. in the case of a close generated by a concat operation).
*/
public long getFileSize() {
return fileSize;
}
/**
* The time when this event occurred, in milliseconds since the epoch.
*/
public long getTimestamp() {
return timestamp;
}
}
/**
* Sent when a new file is created (including overwrite).
*/
public static class CreateEvent extends Event {
public static enum INodeType {
FILE, DIRECTORY, SYMLINK;
}
private INodeType iNodeType;
private String path;
private long ctime;
private int replication;
private String ownerName;
private String groupName;
private FsPermission perms;
private String symlinkTarget;
public static class Builder {
private INodeType iNodeType;
private String path;
private long ctime;
private int replication;
private String ownerName;
private String groupName;
private FsPermission perms;
private String symlinkTarget;
public Builder iNodeType(INodeType type) {
this.iNodeType = type;
return this;
}
public Builder path(String path) {
this.path = path;
return this;
}
public Builder ctime(long ctime) {
this.ctime = ctime;
return this;
}
public Builder replication(int replication) {
this.replication = replication;
return this;
}
public Builder ownerName(String ownerName) {
this.ownerName = ownerName;
return this;
}
public Builder groupName(String groupName) {
this.groupName = groupName;
return this;
}
public Builder perms(FsPermission perms) {
this.perms = perms;
return this;
}
public Builder symlinkTarget(String symlinkTarget) {
this.symlinkTarget = symlinkTarget;
return this;
}
public CreateEvent build() {
return new CreateEvent(this);
}
}
private CreateEvent(Builder b) {
super(EventType.CREATE);
this.iNodeType = b.iNodeType;
this.path = b.path;
this.ctime = b.ctime;
this.replication = b.replication;
this.ownerName = b.ownerName;
this.groupName = b.groupName;
this.perms = b.perms;
this.symlinkTarget = b.symlinkTarget;
}
public INodeType getiNodeType() {
return iNodeType;
}
public String getPath() {
return path;
}
/**
* Creation time of the file, directory, or symlink.
*/
public long getCtime() {
return ctime;
}
/**
* Replication is zero if the CreateEvent iNodeType is directory or symlink.
*/
public int getReplication() {
return replication;
}
public String getOwnerName() {
return ownerName;
}
public String getGroupName() {
return groupName;
}
public FsPermission getPerms() {
return perms;
}
/**
* Symlink target is null if the CreateEvent iNodeType is not symlink.
*/
public String getSymlinkTarget() {
return symlinkTarget;
}
}
/**
* Sent when there is an update to directory or file (none of the metadata
* tracked here applies to symlinks) that is not associated with another
* inotify event. The tracked metadata includes atime/mtime, replication,
* owner/group, permissions, ACLs, and XAttributes. Fields not relevant to the
* metadataType of the MetadataUpdateEvent will be null or will have their default
* values.
*/
public static class MetadataUpdateEvent extends Event {
public static enum MetadataType {
TIMES, REPLICATION, OWNER, PERMS, ACLS, XATTRS;
}
private String path;
private MetadataType metadataType;
private long mtime;
private long atime;
private int replication;
private String ownerName;
private String groupName;
private FsPermission perms;
private List<AclEntry> acls;
private List<XAttr> xAttrs;
private boolean xAttrsRemoved;
public static class Builder {
private String path;
private MetadataType metadataType;
private long mtime;
private long atime;
private int replication;
private String ownerName;
private String groupName;
private FsPermission perms;
private List<AclEntry> acls;
private List<XAttr> xAttrs;
private boolean xAttrsRemoved;
public Builder path(String path) {
this.path = path;
return this;
}
public Builder metadataType(MetadataType type) {
this.metadataType = type;
return this;
}
public Builder mtime(long mtime) {
this.mtime = mtime;
return this;
}
public Builder atime(long atime) {
this.atime = atime;
return this;
}
public Builder replication(int replication) {
this.replication = replication;
return this;
}
public Builder ownerName(String ownerName) {
this.ownerName = ownerName;
return this;
}
public Builder groupName(String groupName) {
this.groupName = groupName;
return this;
}
public Builder perms(FsPermission perms) {
this.perms = perms;
return this;
}
public Builder acls(List<AclEntry> acls) {
this.acls = acls;
return this;
}
public Builder xAttrs(List<XAttr> xAttrs) {
this.xAttrs = xAttrs;
return this;
}
public Builder xAttrsRemoved(boolean xAttrsRemoved) {
this.xAttrsRemoved = xAttrsRemoved;
return this;
}
public MetadataUpdateEvent build() {
return new MetadataUpdateEvent(this);
}
}
private MetadataUpdateEvent(Builder b) {
super(EventType.METADATA);
this.path = b.path;
this.metadataType = b.metadataType;
this.mtime = b.mtime;
this.atime = b.atime;
this.replication = b.replication;
this.ownerName = b.ownerName;
this.groupName = b.groupName;
this.perms = b.perms;
this.acls = b.acls;
this.xAttrs = b.xAttrs;
this.xAttrsRemoved = b.xAttrsRemoved;
}
public String getPath() {
return path;
}
public MetadataType getMetadataType() {
return metadataType;
}
public long getMtime() {
return mtime;
}
public long getAtime() {
return atime;
}
public int getReplication() {
return replication;
}
public String getOwnerName() {
return ownerName;
}
public String getGroupName() {
return groupName;
}
public FsPermission getPerms() {
return perms;
}
/**
* The full set of ACLs currently associated with this file or directory.
* May be null if all ACLs were removed.
*/
public List<AclEntry> getAcls() {
return acls;
}
public List<XAttr> getxAttrs() {
return xAttrs;
}
/**
* Whether the xAttrs returned by getxAttrs() were removed (as opposed to
* added).
*/
public boolean isxAttrsRemoved() {
return xAttrsRemoved;
}
}
/**
* Sent when a file, directory, or symlink is renamed.
*/
public static class RenameEvent extends Event {
private String srcPath;
private String dstPath;
private long timestamp;
public RenameEvent(String srcPath, String dstPath, long timestamp) {
super(EventType.RENAME);
this.srcPath = srcPath;
this.dstPath = dstPath;
this.timestamp = timestamp;
}
public String getSrcPath() {
return srcPath;
}
public String getDstPath() {
return dstPath;
}
/**
* The time when this event occurred, in milliseconds since the epoch.
*/
public long getTimestamp() {
return timestamp;
}
}
/**
* Sent when an existing file is opened for append.
*/
public static class AppendEvent extends Event {
private String path;
public AppendEvent(String path) {
super(EventType.APPEND);
this.path = path;
}
public String getPath() {
return path;
}
}
/**
* Sent when a file, directory, or symlink is deleted.
*/
public static class UnlinkEvent extends Event {
private String path;
private long timestamp;
public UnlinkEvent(String path, long timestamp) {
super(EventType.UNLINK);
this.path = path;
this.timestamp = timestamp;
}
public String getPath() {
return path;
}
/**
* The time when this event occurred, in milliseconds since the epoch.
*/
public long getTimestamp() {
return timestamp;
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.inotify;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.List;
/**
* Contains a set of events, the transaction ID in the edit log up to which we
* read to produce these events, and the first txid we observed when producing
* these events (the last of which is for the purpose of determining whether we
* have missed events due to edit deletion). Also contains the most recent txid
* that the NameNode has sync'ed, so the client can determine how far behind in
* the edit log it is.
*/
@InterfaceAudience.Private
public class EventsList {
private List<Event> events;
private long firstTxid;
private long lastTxid;
private long syncTxid;
public EventsList(List<Event> events, long firstTxid, long lastTxid,
long syncTxid) {
this.events = events;
this.firstTxid = firstTxid;
this.lastTxid = lastTxid;
this.syncTxid = syncTxid;
}
public List<Event> getEvents() {
return events;
}
public long getFirstTxid() {
return firstTxid;
}
public long getLastTxid() {
return lastTxid;
}
public long getSyncTxid() {
return syncTxid;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.inotify;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MissingEventsException extends Exception {
private static final long serialVersionUID = 1L;
private long expectedTxid;
private long actualTxid;
public MissingEventsException() {}
public MissingEventsException(long expectedTxid, long actualTxid) {
this.expectedTxid = expectedTxid;
this.actualTxid = actualTxid;
}
public long getExpectedTxid() {
return expectedTxid;
}
public long getActualTxid() {
return actualTxid;
}
@Override
public String toString() {
return "We expected the next batch of events to start with transaction ID "
+ expectedTxid + ", but it instead started with transaction ID " +
actualTxid + ". Most likely the intervening transactions were cleaned "
+ "up as part of checkpointing.";
}
}

View File

@ -43,10 +43,13 @@
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@ -1372,4 +1375,19 @@ public List<XAttr> listXAttrs(String src)
*/
@Idempotent
public void checkAccess(String path, FsAction mode) throws IOException;
/**
* Get the highest txid the NameNode knows has been written to the edit
* log, or -1 if the NameNode's edit log is not yet open for write. Used as
* the starting point for the inotify event stream.
*/
@Idempotent
public long getCurrentEditLogTxid() throws IOException;
/**
* Get an ordered list of events corresponding to the edit log transactions
* from txid onwards.
*/
@Idempotent
public EventsList getEditsFromTxid(long txid) throws IOException;
}

View File

@ -29,26 +29,90 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public enum ReplaceDatanodeOnFailure {
/** The feature is disabled in the entire site. */
DISABLE,
/** Never add a new datanode. */
NEVER,
/**
* DEFAULT policy:
* Let r be the replication number.
* Let n be the number of existing datanodes.
* Add a new datanode only if r >= 3 and either
* (1) floor(r/2) >= n; or
* (2) r > n and the block is hflushed/appended.
*/
DEFAULT,
/** Always add a new datanode when an existing datanode is removed. */
ALWAYS;
public class ReplaceDatanodeOnFailure {
/** The replacement policies */
public enum Policy {
/** The feature is disabled in the entire site. */
DISABLE(Condition.FALSE),
/** Never add a new datanode. */
NEVER(Condition.FALSE),
/** @see ReplaceDatanodeOnFailure.Condition#DEFAULT */
DEFAULT(Condition.DEFAULT),
/** Always add a new datanode when an existing datanode is removed. */
ALWAYS(Condition.TRUE);
private final Condition condition;
private Policy(Condition condition) {
this.condition = condition;
}
Condition getCondition() {
return condition;
}
}
/** Datanode replacement condition */
private static interface Condition {
/** Return true unconditionally. */
static final Condition TRUE = new Condition() {
@Override
public boolean satisfy(short replication, DatanodeInfo[] existings,
int nExistings, boolean isAppend, boolean isHflushed) {
return true;
}
};
/** Return false unconditionally. */
static final Condition FALSE = new Condition() {
@Override
public boolean satisfy(short replication, DatanodeInfo[] existings,
int nExistings, boolean isAppend, boolean isHflushed) {
return false;
}
};
/**
* DEFAULT condition:
* Let r be the replication number.
* Let n be the number of existing datanodes.
* Add a new datanode only if r >= 3 and either
* (1) floor(r/2) >= n; or
* (2) r > n and the block is hflushed/appended.
*/
static final Condition DEFAULT = new Condition() {
@Override
public boolean satisfy(final short replication,
final DatanodeInfo[] existings, final int n, final boolean isAppend,
final boolean isHflushed) {
if (replication < 3) {
return false;
} else {
if (n <= (replication/2)) {
return true;
} else {
return isAppend || isHflushed;
}
}
}
};
/** Is the condition satisfied? */
public boolean satisfy(short replication, DatanodeInfo[] existings,
int nExistings, boolean isAppend, boolean isHflushed);
}
private final Policy policy;
private final boolean bestEffort;
public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) {
this.policy = policy;
this.bestEffort = bestEffort;
}
/** Check if the feature is enabled. */
public void checkEnabled() {
if (this == DISABLE) {
if (policy == Policy.DISABLE) {
throw new UnsupportedOperationException(
"This feature is disabled. Please refer to "
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
@ -56,7 +120,20 @@ public void checkEnabled() {
}
}
/** Is the policy satisfied? */
/**
* Best effort means that the client will try to replace the failed datanode
* (provided that the policy is satisfied), however, it will continue the
* write operation in case that the datanode replacement also fails.
*
* @return Suppose the datanode replacement fails.
* false: An exception should be thrown so that the write will fail.
* true : The write should be resumed with the remaining datandoes.
*/
public boolean isBestEffort() {
return bestEffort;
}
/** Does it need a replacement according to the policy? */
public boolean satisfy(
final short replication, final DatanodeInfo[] existings,
final boolean isAppend, final boolean isHflushed) {
@ -64,40 +141,42 @@ public boolean satisfy(
if (n == 0 || n >= replication) {
//don't need to add datanode for any policy.
return false;
} else if (this == DISABLE || this == NEVER) {
return false;
} else if (this == ALWAYS) {
return true;
} else {
//DEFAULT
if (replication < 3) {
return false;
} else {
if (n <= (replication/2)) {
return true;
} else {
return isAppend || isHflushed;
}
}
return policy.getCondition().satisfy(
replication, existings, n, isAppend, isHflushed);
}
}
@Override
public String toString() {
return policy.toString();
}
/** Get the setting from configuration. */
public static ReplaceDatanodeOnFailure get(final Configuration conf) {
final Policy policy = getPolicy(conf);
final boolean bestEffort = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_DEFAULT);
return new ReplaceDatanodeOnFailure(policy, bestEffort);
}
private static Policy getPolicy(final Configuration conf) {
final boolean enabled = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
if (!enabled) {
return DISABLE;
return Policy.DISABLE;
}
final String policy = conf.get(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
for(int i = 1; i < values().length; i++) {
final ReplaceDatanodeOnFailure rdof = values()[i];
if (rdof.name().equalsIgnoreCase(policy)) {
return rdof;
for(int i = 1; i < Policy.values().length; i++) {
final Policy p = Policy.values()[i];
if (p.name().equalsIgnoreCase(policy)) {
return p;
}
}
throw new HadoopIllegalArgumentException("Illegal configuration value for "
@ -106,12 +185,16 @@ public static ReplaceDatanodeOnFailure get(final Configuration conf) {
}
/** Write the setting to configuration. */
public void write(final Configuration conf) {
public static void write(final Policy policy,
final boolean bestEffort, final Configuration conf) {
conf.setBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
this != DISABLE);
policy != Policy.DISABLE);
conf.set(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
name());
policy.name());
conf.setBoolean(
DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY,
bestEffort);
}
}

View File

@ -91,12 +91,16 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@ -1408,4 +1412,25 @@ public CheckAccessResponseProto checkAccess(RpcController controller,
}
return VOID_CHECKACCESS_RESPONSE;
}
public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid(RpcController controller,
GetCurrentEditLogTxidRequestProto req) throws ServiceException {
try {
return GetCurrentEditLogTxidResponseProto.newBuilder().setTxid(
server.getCurrentEditLogTxid()).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetEditsFromTxidResponseProto getEditsFromTxid(RpcController controller,
GetEditsFromTxidRequestProto req) throws ServiceException {
try {
return PBHelper.convertEditsResponse(server.getEditsFromTxid(
req.getTxid()));
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -95,10 +96,12 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
@ -159,6 +162,7 @@
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@ -1430,4 +1434,25 @@ public void checkAccess(String path, FsAction mode) throws IOException {
throw ProtobufHelper.getRemoteException(e);
}
}
public long getCurrentEditLogTxid() throws IOException {
GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto
.getDefaultInstance();
try {
return rpcProxy.getCurrentEditLogTxid(null, req).getTxid();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public EventsList getEditsFromTxid(long txid) throws IOException {
GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder()
.setTxid(txid).build();
try {
return PBHelper.convert(rpcProxy.getEditsFromTxid(null, req));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -46,6 +46,8 @@
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -96,6 +98,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
@ -158,6 +161,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
@ -2348,6 +2352,247 @@ public static ShmId convert(ShortCircuitShmIdProto shmId) {
return new ShmId(shmId.getHi(), shmId.getLo());
}
private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType
type) {
switch (type) {
case I_TYPE_DIRECTORY:
return Event.CreateEvent.INodeType.DIRECTORY;
case I_TYPE_FILE:
return Event.CreateEvent.INodeType.FILE;
case I_TYPE_SYMLINK:
return Event.CreateEvent.INodeType.SYMLINK;
default:
return null;
}
}
private static InotifyProtos.MetadataUpdateType metadataUpdateTypeConvert(
Event.MetadataUpdateEvent.MetadataType type) {
switch (type) {
case TIMES:
return InotifyProtos.MetadataUpdateType.META_TYPE_TIMES;
case REPLICATION:
return InotifyProtos.MetadataUpdateType.META_TYPE_REPLICATION;
case OWNER:
return InotifyProtos.MetadataUpdateType.META_TYPE_OWNER;
case PERMS:
return InotifyProtos.MetadataUpdateType.META_TYPE_PERMS;
case ACLS:
return InotifyProtos.MetadataUpdateType.META_TYPE_ACLS;
case XATTRS:
return InotifyProtos.MetadataUpdateType.META_TYPE_XATTRS;
default:
return null;
}
}
private static Event.MetadataUpdateEvent.MetadataType metadataUpdateTypeConvert(
InotifyProtos.MetadataUpdateType type) {
switch (type) {
case META_TYPE_TIMES:
return Event.MetadataUpdateEvent.MetadataType.TIMES;
case META_TYPE_REPLICATION:
return Event.MetadataUpdateEvent.MetadataType.REPLICATION;
case META_TYPE_OWNER:
return Event.MetadataUpdateEvent.MetadataType.OWNER;
case META_TYPE_PERMS:
return Event.MetadataUpdateEvent.MetadataType.PERMS;
case META_TYPE_ACLS:
return Event.MetadataUpdateEvent.MetadataType.ACLS;
case META_TYPE_XATTRS:
return Event.MetadataUpdateEvent.MetadataType.XATTRS;
default:
return null;
}
}
private static InotifyProtos.INodeType createTypeConvert(Event.CreateEvent.INodeType
type) {
switch (type) {
case DIRECTORY:
return InotifyProtos.INodeType.I_TYPE_DIRECTORY;
case FILE:
return InotifyProtos.INodeType.I_TYPE_FILE;
case SYMLINK:
return InotifyProtos.INodeType.I_TYPE_SYMLINK;
default:
return null;
}
}
public static EventsList convert(GetEditsFromTxidResponseProto resp) throws
IOException {
List<Event> events = Lists.newArrayList();
for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) {
switch(p.getType()) {
case EVENT_CLOSE:
InotifyProtos.CloseEventProto close =
InotifyProtos.CloseEventProto.parseFrom(p.getContents());
events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(),
close.getTimestamp()));
break;
case EVENT_CREATE:
InotifyProtos.CreateEventProto create =
InotifyProtos.CreateEventProto.parseFrom(p.getContents());
events.add(new Event.CreateEvent.Builder()
.iNodeType(createTypeConvert(create.getType()))
.path(create.getPath())
.ctime(create.getCtime())
.ownerName(create.getOwnerName())
.groupName(create.getGroupName())
.perms(convert(create.getPerms()))
.replication(create.getReplication())
.symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
create.getSymlinkTarget()).build());
break;
case EVENT_METADATA:
InotifyProtos.MetadataUpdateEventProto meta =
InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
events.add(new Event.MetadataUpdateEvent.Builder()
.path(meta.getPath())
.metadataType(metadataUpdateTypeConvert(meta.getType()))
.mtime(meta.getMtime())
.atime(meta.getAtime())
.replication(meta.getReplication())
.ownerName(
meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
.groupName(
meta.getGroupName().isEmpty() ? null : meta.getGroupName())
.perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
.acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
meta.getAclsList()))
.xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
meta.getXAttrsList()))
.xAttrsRemoved(meta.getXAttrsRemoved())
.build());
break;
case EVENT_RENAME:
InotifyProtos.RenameEventProto rename =
InotifyProtos.RenameEventProto.parseFrom(p.getContents());
events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(),
rename.getTimestamp()));
break;
case EVENT_APPEND:
InotifyProtos.AppendEventProto reopen =
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
events.add(new Event.AppendEvent(reopen.getPath()));
break;
case EVENT_UNLINK:
InotifyProtos.UnlinkEventProto unlink =
InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp()));
break;
default:
throw new RuntimeException("Unexpected inotify event type: " +
p.getType());
}
}
return new EventsList(events, resp.getEventsList().getFirstTxid(),
resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid());
}
public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) {
InotifyProtos.EventsListProto.Builder builder =
InotifyProtos.EventsListProto.newBuilder();
for (Event e : el.getEvents()) {
switch(e.getEventType()) {
case CLOSE:
Event.CloseEvent ce = (Event.CloseEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_CLOSE)
.setContents(
InotifyProtos.CloseEventProto.newBuilder()
.setPath(ce.getPath())
.setFileSize(ce.getFileSize())
.setTimestamp(ce.getTimestamp()).build().toByteString()
).build());
break;
case CREATE:
Event.CreateEvent ce2 = (Event.CreateEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_CREATE)
.setContents(
InotifyProtos.CreateEventProto.newBuilder()
.setType(createTypeConvert(ce2.getiNodeType()))
.setPath(ce2.getPath())
.setCtime(ce2.getCtime())
.setOwnerName(ce2.getOwnerName())
.setGroupName(ce2.getGroupName())
.setPerms(convert(ce2.getPerms()))
.setReplication(ce2.getReplication())
.setSymlinkTarget(ce2.getSymlinkTarget() == null ?
"" : ce2.getSymlinkTarget()).build().toByteString()
).build());
break;
case METADATA:
Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
InotifyProtos.MetadataUpdateEventProto.Builder metaB =
InotifyProtos.MetadataUpdateEventProto.newBuilder()
.setPath(me.getPath())
.setType(metadataUpdateTypeConvert(me.getMetadataType()))
.setMtime(me.getMtime())
.setAtime(me.getAtime())
.setReplication(me.getReplication())
.setOwnerName(me.getOwnerName() == null ? "" :
me.getOwnerName())
.setGroupName(me.getGroupName() == null ? "" :
me.getGroupName())
.addAllAcls(me.getAcls() == null ?
Lists.<AclEntryProto>newArrayList() :
convertAclEntryProto(me.getAcls()))
.addAllXAttrs(me.getxAttrs() == null ?
Lists.<XAttrProto>newArrayList() :
convertXAttrProto(me.getxAttrs()))
.setXAttrsRemoved(me.isxAttrsRemoved());
if (me.getPerms() != null) {
metaB.setPerms(convert(me.getPerms()));
}
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_METADATA)
.setContents(metaB.build().toByteString())
.build());
break;
case RENAME:
Event.RenameEvent re = (Event.RenameEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_RENAME)
.setContents(
InotifyProtos.RenameEventProto.newBuilder()
.setSrcPath(re.getSrcPath())
.setDestPath(re.getDstPath())
.setTimestamp(re.getTimestamp()).build().toByteString()
).build());
break;
case APPEND:
Event.AppendEvent re2 = (Event.AppendEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_APPEND)
.setContents(
InotifyProtos.AppendEventProto.newBuilder()
.setPath(re2.getPath()).build().toByteString()
).build());
break;
case UNLINK:
Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_UNLINK)
.setContents(
InotifyProtos.UnlinkEventProto.newBuilder()
.setPath(ue.getPath())
.setTimestamp(ue.getTimestamp()).build().toByteString()
).build());
break;
default:
throw new RuntimeException("Unexpected inotify event: " + e);
}
}
builder.setFirstTxid(el.getFirstTxid());
builder.setLastTxid(el.getLastTxid());
builder.setSyncTxid(el.getSyncTxid());
return GetEditsFromTxidResponseProto.newBuilder().setEventsList(
builder.build()).build();
}
public static HdfsProtos.CipherSuite convert(CipherSuite suite) {
switch (suite) {
case UNKNOWN:

View File

@ -79,7 +79,17 @@ public class IPCLoggerChannel implements AsyncLogger {
protected final InetSocketAddress addr;
private QJournalProtocol proxy;
private final ListeningExecutorService executor;
/**
* Executes tasks submitted to it serially, on a single thread, in FIFO order
* (generally used for write tasks that should not be reordered).
*/
private final ListeningExecutorService singleThreadExecutor;
/**
* Executes tasks submitted to it in parallel with each other and with those
* submitted to singleThreadExecutor (generally used for read tasks that can
* be safely reordered and interleaved with writes).
*/
private final ListeningExecutorService parallelExecutor;
private long ipcSerial = 0;
private long epoch = -1;
private long committedTxId = HdfsConstants.INVALID_TXID;
@ -160,8 +170,10 @@ public IPCLoggerChannel(Configuration conf,
DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
executor = MoreExecutors.listeningDecorator(
createExecutor());
singleThreadExecutor = MoreExecutors.listeningDecorator(
createSingleThreadExecutor());
parallelExecutor = MoreExecutors.listeningDecorator(
createParallelExecutor());
metrics = IPCLoggerChannelMetrics.create(this);
}
@ -183,7 +195,8 @@ public synchronized void setCommittedTxId(long txid) {
@Override
public void close() {
// No more tasks may be submitted after this point.
executor.shutdown();
singleThreadExecutor.shutdown();
parallelExecutor.shutdown();
if (proxy != null) {
// TODO: this can hang for quite some time if the client
// is currently in the middle of a call to a downed JN.
@ -230,15 +243,30 @@ public QJournalProtocol run() throws IOException {
* Separated out for easy overriding in tests.
*/
@VisibleForTesting
protected ExecutorService createExecutor() {
protected ExecutorService createSingleThreadExecutor() {
return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Logger channel to " + addr)
.setNameFormat("Logger channel (from single-thread executor) to " +
addr)
.setUncaughtExceptionHandler(
UncaughtExceptionHandlers.systemExit())
.build());
}
/**
* Separated out for easy overriding in tests.
*/
@VisibleForTesting
protected ExecutorService createParallelExecutor() {
return Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Logger channel (from parallel executor) to " + addr)
.setUncaughtExceptionHandler(
UncaughtExceptionHandlers.systemExit())
.build());
}
@Override
public URL buildURLToFetchLogs(long segmentTxId) {
@ -286,7 +314,7 @@ public synchronized boolean isOutOfSync() {
@VisibleForTesting
void waitForAllPendingCalls() throws InterruptedException {
try {
executor.submit(new Runnable() {
singleThreadExecutor.submit(new Runnable() {
@Override
public void run() {
}
@ -299,7 +327,7 @@ public void run() {
@Override
public ListenableFuture<Boolean> isFormatted() {
return executor.submit(new Callable<Boolean>() {
return singleThreadExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
return getProxy().isFormatted(journalId);
@ -309,7 +337,7 @@ public Boolean call() throws IOException {
@Override
public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
return executor.submit(new Callable<GetJournalStateResponseProto>() {
return singleThreadExecutor.submit(new Callable<GetJournalStateResponseProto>() {
@Override
public GetJournalStateResponseProto call() throws IOException {
GetJournalStateResponseProto ret =
@ -323,7 +351,7 @@ public GetJournalStateResponseProto call() throws IOException {
@Override
public ListenableFuture<NewEpochResponseProto> newEpoch(
final long epoch) {
return executor.submit(new Callable<NewEpochResponseProto>() {
return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() {
@Override
public NewEpochResponseProto call() throws IOException {
return getProxy().newEpoch(journalId, nsInfo, epoch);
@ -347,7 +375,7 @@ public ListenableFuture<Void> sendEdits(
ListenableFuture<Void> ret = null;
try {
ret = executor.submit(new Callable<Void>() {
ret = singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
throwIfOutOfSync();
@ -464,7 +492,7 @@ private synchronized void unreserveQueueSpace(int size) {
@Override
public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
getProxy().format(journalId, nsInfo);
@ -476,7 +504,7 @@ public Void call() throws Exception {
@Override
public ListenableFuture<Void> startLogSegment(final long txid,
final int layoutVersion) {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
@ -497,7 +525,7 @@ public Void call() throws IOException {
@Override
public ListenableFuture<Void> finalizeLogSegment(
final long startTxId, final long endTxId) {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
throwIfOutOfSync();
@ -510,7 +538,7 @@ public Void call() throws IOException {
@Override
public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
@ -522,7 +550,7 @@ public Void call() throws Exception {
@Override
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId, final boolean inProgressOk) {
return executor.submit(new Callable<RemoteEditLogManifest>() {
return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() {
@Override
public RemoteEditLogManifest call() throws IOException {
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
@ -538,7 +566,7 @@ public RemoteEditLogManifest call() throws IOException {
@Override
public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
final long segmentTxId) {
return executor.submit(new Callable<PrepareRecoveryResponseProto>() {
return singleThreadExecutor.submit(new Callable<PrepareRecoveryResponseProto>() {
@Override
public PrepareRecoveryResponseProto call() throws IOException {
if (!hasHttpServerEndPoint()) {
@ -556,7 +584,7 @@ public PrepareRecoveryResponseProto call() throws IOException {
@Override
public ListenableFuture<Void> acceptRecovery(
final SegmentStateProto log, final URL url) {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().acceptRecovery(createReqInfo(), log, url);
@ -567,7 +595,7 @@ public Void call() throws IOException {
@Override
public ListenableFuture<Void> doPreUpgrade() {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doPreUpgrade(journalId);
@ -578,7 +606,7 @@ public Void call() throws IOException {
@Override
public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doUpgrade(journalId, sInfo);
@ -589,7 +617,7 @@ public Void call() throws IOException {
@Override
public ListenableFuture<Void> doFinalize() {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doFinalize(journalId);
@ -601,7 +629,7 @@ public Void call() throws IOException {
@Override
public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
final StorageInfo prevStorage, final int targetLayoutVersion) {
return executor.submit(new Callable<Boolean>() {
return singleThreadExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
return getProxy().canRollBack(journalId, storage, prevStorage,
@ -612,7 +640,7 @@ public Boolean call() throws IOException {
@Override
public ListenableFuture<Void> doRollback() {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doRollback(journalId);
@ -623,7 +651,7 @@ public Void call() throws IOException {
@Override
public ListenableFuture<Void> discardSegments(final long startTxId) {
return executor.submit(new Callable<Void>() {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().discardSegments(journalId, startTxId);
@ -634,7 +662,7 @@ public Void call() throws IOException {
@Override
public ListenableFuture<Long> getJournalCTime() {
return executor.submit(new Callable<Long>() {
return singleThreadExecutor.submit(new Callable<Long>() {
@Override
public Long call() throws IOException {
return getProxy().getJournalCTime(journalId);

View File

@ -651,7 +651,8 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
}
}
if (log != null && log.isInProgress()) {
logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId()));
logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId(),
true));
}
}

View File

@ -1744,7 +1744,7 @@ private class DataTransfer implements Runnable {
+ b + " (numBytes=" + b.getNumBytes() + ")"
+ ", stage=" + stage
+ ", clientname=" + clientname
+ ", targests=" + Arrays.asList(targets));
+ ", targets=" + Arrays.asList(targets));
}
this.targets = targets;
this.targetStorageTypes = targetStorageTypes;

View File

@ -147,4 +147,9 @@ public boolean isInProgress() {
public void setMaxOpSize(int maxOpSize) {
reader.setMaxOpSize(maxOpSize);
}
@Override
public boolean isLocalLog() {
return true;
}
}

View File

@ -506,4 +506,9 @@ public void setMaxOpSize(int maxOpSize) {
reader.setMaxOpSize(maxOpSize);
}
}
@Override
public boolean isLocalLog() {
return log instanceof FileLog;
}
}

View File

@ -203,4 +203,10 @@ FSEditLogOp getCachedOp() {
* Set the maximum opcode size in bytes.
*/
public abstract void setMaxOpSize(int maxOpSize);
/**
* Returns true if we are currently reading the log from a local disk or an
* even faster data source (e.g. a byte buffer).
*/
public abstract boolean isLocalLog();
}

View File

@ -188,6 +188,13 @@ protected OpInstanceCache initialValue() {
*/
private final List<URI> sharedEditsDirs;
/**
* Take this lock when adding journals to or closing the JournalSet. Allows
* us to ensure that the JournalSet isn't closed or updated underneath us
* in selectInputStreams().
*/
private final Object journalSetLock = new Object();
private static class TransactionId {
public long txid;
@ -252,20 +259,22 @@ private synchronized void initJournals(List<URI> dirs) {
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
journalSet = new JournalSet(minimumRedundantJournals);
synchronized(journalSetLock) {
journalSet = new JournalSet(minimumRedundantJournals);
for (URI u : dirs) {
boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
.contains(u);
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
journalSet.add(new FileJournalManager(conf, sd, storage),
required, sharedEditsDirs.contains(u));
for (URI u : dirs) {
boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
.contains(u);
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
journalSet.add(new FileJournalManager(conf, sd, storage),
required, sharedEditsDirs.contains(u));
}
} else {
journalSet.add(createJournal(u), required,
sharedEditsDirs.contains(u));
}
} else {
journalSet.add(createJournal(u), required,
sharedEditsDirs.contains(u));
}
}
@ -349,7 +358,9 @@ synchronized void close() {
} finally {
if (journalSet != null && !journalSet.isEmpty()) {
try {
journalSet.close();
synchronized(journalSetLock) {
journalSet.close();
}
} catch (IOException ioe) {
LOG.warn("Error closing journalSet", ioe);
}
@ -606,7 +617,9 @@ public void logSync() {
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
synchronized(journalSetLock) {
IOUtils.cleanup(LOG, journalSet);
}
terminate(1, msg);
}
} finally {
@ -630,7 +643,9 @@ public void logSync() {
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
synchronized(journalSetLock) {
IOUtils.cleanup(LOG, journalSet);
}
terminate(1, msg);
}
}
@ -1303,9 +1318,8 @@ synchronized void waitForSyncToFinish() {
/**
* Return the txid of the last synced transaction.
* For test use only
*/
synchronized long getSyncTxId() {
public synchronized long getSyncTxId() {
return synctxid;
}
@ -1342,7 +1356,9 @@ synchronized void registerBackupNode(
LOG.info("Registering new backup node: " + bnReg);
BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
journalSet.add(bjm, false);
synchronized(journalSetLock) {
journalSet.add(bjm, false);
}
}
synchronized void releaseBackupStream(NamenodeRegistration registration)
@ -1350,7 +1366,9 @@ synchronized void releaseBackupStream(NamenodeRegistration registration)
BackupJournalManager bjm = this.findBackupJournal(registration);
if (bjm != null) {
LOG.info("Removing backup journal " + bjm);
journalSet.remove(bjm);
synchronized(journalSetLock) {
journalSet.remove(bjm);
}
}
}
@ -1489,11 +1507,16 @@ public Collection<EditLogInputStream> selectInputStreams(
* @param recovery recovery context
* @param inProgressOk set to true if in-progress streams are OK
*/
public synchronized Collection<EditLogInputStream> selectInputStreams(
public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
selectInputStreams(streams, fromTxId, inProgressOk);
synchronized(journalSetLock) {
Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
"selectInputStreams() on closed FSEditLog");
selectInputStreams(streams, fromTxId, inProgressOk);
}
try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);

View File

@ -187,17 +187,27 @@ public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
allLogFiles.size());
for (EditLogFile elf : allLogFiles) {
if (elf.hasCorruptHeader() || (!inProgressOk && elf.isInProgress())) {
continue;
}
if (elf.isInProgress()) {
try {
elf.validateLog();
} catch (IOException e) {
LOG.error("got IOException while trying to validate header of " +
elf + ". Skipping.", e);
continue;
}
}
if (elf.getFirstTxId() >= firstTxId) {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId,
elf.isInProgress()));
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
// If the firstTxId is in the middle of an edit log segment. Return this
// anyway and let the caller figure out whether it wants to use it.
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId,
elf.isInProgress()));
}
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.protocol.Block;
import java.util.List;
/**
* Translates from edit log ops to inotify events.
*/
@InterfaceAudience.Private
public class InotifyFSEditLogOpTranslator {
private static long getSize(FSEditLogOp.AddCloseOp acOp) {
long size = 0;
for (Block b : acOp.getBlocks()) {
size += b.getNumBytes();
}
return size;
}
public static Event[] translate(FSEditLogOp op) {
switch(op.opCode) {
case OP_ADD:
FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op;
if (addOp.blocks.length == 0) { // create
return new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
.ctime(addOp.atime)
.replication(addOp.replication)
.ownerName(addOp.permissions.getUserName())
.groupName(addOp.permissions.getGroupName())
.perms(addOp.permissions.getPermission())
.iNodeType(Event.CreateEvent.INodeType.FILE).build() };
} else {
return new Event[] { new Event.AppendEvent(addOp.path) };
}
case OP_CLOSE:
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
return new Event[] {
new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) };
case OP_SET_REPLICATION:
FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION)
.path(setRepOp.path)
.replication(setRepOp.replication).build() };
case OP_CONCAT_DELETE:
FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
List<Event> events = Lists.newArrayList();
events.add(new Event.AppendEvent(cdOp.trg));
for (String src : cdOp.srcs) {
events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
}
events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
return events.toArray(new Event[0]);
case OP_RENAME_OLD:
FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
return new Event[] {
new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) };
case OP_RENAME:
FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
return new Event[] {
new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) };
case OP_DELETE:
FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) };
case OP_MKDIR:
FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
.ctime(mkOp.timestamp)
.ownerName(mkOp.permissions.getUserName())
.groupName(mkOp.permissions.getGroupName())
.perms(mkOp.permissions.getPermission())
.iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() };
case OP_SET_PERMISSIONS:
FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS)
.path(permOp.src)
.perms(permOp.permissions).build() };
case OP_SET_OWNER:
FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER)
.path(ownOp.src)
.ownerName(ownOp.username).groupName(ownOp.groupname).build() };
case OP_TIMES:
FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES)
.path(timesOp.path)
.atime(timesOp.atime).mtime(timesOp.mtime).build() };
case OP_SYMLINK:
FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op;
return new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
.ctime(symOp.atime)
.ownerName(symOp.permissionStatus.getUserName())
.groupName(symOp.permissionStatus.getGroupName())
.perms(symOp.permissionStatus.getPermission())
.symlinkTarget(symOp.value)
.iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() };
case OP_REMOVE_XATTR:
FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(rxOp.src)
.xAttrs(rxOp.xAttrs)
.xAttrsRemoved(true).build() };
case OP_SET_XATTR:
FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(sxOp.src)
.xAttrs(sxOp.xAttrs)
.xAttrsRemoved(false).build() };
case OP_SET_ACL:
FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
.path(saOp.src)
.acls(saOp.aclEntries).build() };
default:
return null;
}
}
}

View File

@ -56,6 +56,17 @@
public class JournalSet implements JournalManager {
static final Log LOG = LogFactory.getLog(FSEditLog.class);
private static final Comparator<EditLogInputStream>
LOCAL_LOG_PREFERENCE_COMPARATOR = new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream elis1, EditLogInputStream elis2) {
// we want local logs to be ordered earlier in the collection, and true
// is considered larger than false, so we want to invert the booleans here
return ComparisonChain.start().compare(!elis1.isLocalLog(),
!elis2.isLocalLog()).result();
}
};
static final public Comparator<EditLogInputStream>
EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
@ -180,6 +191,8 @@ public boolean isShared() {
private final List<JournalAndStream> journals =
new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
final int minimumRedundantJournals;
private boolean closed;
JournalSet(int minimumRedundantResources) {
this.minimumRedundantJournals = minimumRedundantResources;
@ -233,6 +246,11 @@ public void apply(JournalAndStream jas) throws IOException {
jas.close();
}
}, "close journal");
closed = true;
}
public boolean isOpen() {
return !closed;
}
/**
@ -281,10 +299,25 @@ public static void chainAndMakeRedundantStreams(
if (acc.isEmpty()) {
acc.add(elis);
} else {
long accFirstTxId = acc.get(0).getFirstTxId();
EditLogInputStream accFirst = acc.get(0);
long accFirstTxId = accFirst.getFirstTxId();
if (accFirstTxId == elis.getFirstTxId()) {
acc.add(elis);
// if we have a finalized log segment available at this txid,
// we should throw out all in-progress segments at this txid
if (elis.isInProgress()) {
if (accFirst.isInProgress()) {
acc.add(elis);
}
} else {
if (accFirst.isInProgress()) {
acc.clear();
}
acc.add(elis);
}
} else if (accFirstTxId < elis.getFirstTxId()) {
// try to read from the local logs first since the throughput should
// be higher
Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
acc.add(elis);
@ -296,6 +329,7 @@ public static void chainAndMakeRedundantStreams(
}
}
if (!acc.isEmpty()) {
Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear();
}

View File

@ -34,6 +34,7 @@
import java.util.List;
import java.util.Set;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
@ -66,6 +67,8 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -1471,5 +1474,116 @@ public void removeXAttr(String src, XAttr xAttr) throws IOException {
public void checkAccess(String path, FsAction mode) throws IOException {
namesystem.checkAccess(path, mode);
}
@Override // ClientProtocol
public long getCurrentEditLogTxid() throws IOException {
namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege();
// if it's not yet open for write, we may be in the process of transitioning
// from standby to active and may not yet know what the latest committed
// txid is
return namesystem.getEditLog().isOpenForWrite() ?
namesystem.getEditLog().getLastWrittenTxId() : -1;
}
private static FSEditLogOp readOp(EditLogInputStream elis)
throws IOException {
try {
return elis.readOp();
// we can get the below two exceptions if a segment is deleted
// (because we have accumulated too many edits) or (for the local journal/
// no-QJM case only) if a in-progress segment is finalized under us ...
// no need to throw an exception back to the client in this case
} catch (FileNotFoundException e) {
LOG.debug("Tried to read from deleted or moved edit log segment", e);
return null;
} catch (TransferFsImage.HttpGetFailedException e) {
LOG.debug("Tried to read from deleted edit log segment", e);
return null;
}
}
@Override // ClientProtocol
public EventsList getEditsFromTxid(long txid) throws IOException {
namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege();
int maxEventsPerRPC = nn.conf.getInt(
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
FSEditLog log = namesystem.getFSImage().getEditLog();
long syncTxid = log.getSyncTxId();
// If we haven't synced anything yet, we can only read finalized
// segments since we can't reliably determine which txns in in-progress
// segments have actually been committed (e.g. written to a quorum of JNs).
// If we have synced txns, we can definitely read up to syncTxid since
// syncTxid is only updated after a transaction is committed to all
// journals. (In-progress segments written by old writers are already
// discarded for us, so if we read any in-progress segments they are
// guaranteed to have been written by this NameNode.)
boolean readInProgress = syncTxid > 0;
List<Event> events = Lists.newArrayList();
long maxSeenTxid = -1;
long firstSeenTxid = -1;
if (syncTxid > 0 && txid > syncTxid) {
// we can't read past syncTxid, so there's no point in going any further
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
}
Collection<EditLogInputStream> streams = null;
try {
streams = log.selectInputStreams(txid, 0, null, readInProgress);
} catch (IllegalStateException e) { // can happen if we have
// transitioned out of active and haven't yet transitioned to standby
// and are using QJM -- the edit log will be closed and this exception
// will result
LOG.info("NN is transitioning from active to standby and FSEditLog " +
"is closed -- could not read edits");
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
}
boolean breakOuter = false;
for (EditLogInputStream elis : streams) {
// our assumption in this code is the EditLogInputStreams are ordered by
// starting txid
try {
FSEditLogOp op = null;
while ((op = readOp(elis)) != null) {
// break out of here in the unlikely event that syncTxid is so
// out of date that its segment has already been deleted, so the first
// txid we get is greater than syncTxid
if (syncTxid > 0 && op.getTransactionId() > syncTxid) {
breakOuter = true;
break;
}
Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op);
if (eventsFromOp != null) {
events.addAll(Arrays.asList(eventsFromOp));
}
if (op.getTransactionId() > maxSeenTxid) {
maxSeenTxid = op.getTransactionId();
}
if (firstSeenTxid == -1) {
firstSeenTxid = op.getTransactionId();
}
if (events.size() >= maxEventsPerRPC || (syncTxid > 0 &&
op.getTransactionId() == syncTxid)) {
// we're done
breakOuter = true;
break;
}
}
} finally {
elis.close();
}
if (breakOuter) {
break;
}
}
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
}
}

View File

@ -279,4 +279,9 @@ public void setMaxOpSize(int maxOpSize) {
elis.setMaxOpSize(maxOpSize);
}
}
@Override
public boolean isLocalLog() {
return streams[curIdx].isLocalLog();
}
}

View File

@ -63,7 +63,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.mortbay.jetty.EofException;
/**
* This class provides fetching a specified file from the NameNode.
@ -370,6 +370,9 @@ private static void copyFileToStream(OutputStream out, File localfile,
throttler.throttle(num, canceler);
}
}
} catch (EofException e) {
LOG.info("Connection closed by client");
out = null; // so we don't close in the finally
} finally {
if (out != null) {
out.close();

View File

@ -33,6 +33,7 @@ import "hdfs.proto";
import "acl.proto";
import "xattr.proto";
import "encryption.proto";
import "inotify.proto";
/**
* The ClientNamenodeProtocol Service defines the interface between a client
@ -665,6 +666,21 @@ message CheckAccessRequestProto {
message CheckAccessResponseProto { // void response
}
message GetCurrentEditLogTxidRequestProto {
}
message GetCurrentEditLogTxidResponseProto {
required int64 txid = 1;
}
message GetEditsFromTxidRequestProto {
required int64 txid = 1;
}
message GetEditsFromTxidResponseProto {
required EventsListProto eventsList = 1;
}
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
@ -802,4 +818,8 @@ service ClientNamenodeProtocol {
returns(ListEncryptionZonesResponseProto);
rpc getEZForPath(GetEZForPathRequestProto)
returns(GetEZForPathResponseProto);
rpc getCurrentEditLogTxid(GetCurrentEditLogTxidRequestProto)
returns(GetCurrentEditLogTxidResponseProto);
rpc getEditsFromTxid(GetEditsFromTxidRequestProto)
returns(GetEditsFromTxidResponseProto);
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* These .proto interfaces are private and stable.
* Please see http://wiki.apache.org/hadoop/Compatibility
* for what changes are allowed for a *stable* .proto interface.
*/
// This file contains protocol buffers used to communicate edits to clients
// as part of the inotify system.
option java_package = "org.apache.hadoop.hdfs.protocol.proto";
option java_outer_classname = "InotifyProtos";
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
import "acl.proto";
import "xattr.proto";
import "hdfs.proto";
enum EventType {
EVENT_CREATE = 0x0;
EVENT_CLOSE = 0x1;
EVENT_APPEND = 0x2;
EVENT_RENAME = 0x3;
EVENT_METADATA = 0x4;
EVENT_UNLINK = 0x5;
}
message EventProto {
required EventType type = 1;
required bytes contents = 2;
}
enum INodeType {
I_TYPE_FILE = 0x0;
I_TYPE_DIRECTORY = 0x1;
I_TYPE_SYMLINK = 0x2;
}
enum MetadataUpdateType {
META_TYPE_TIMES = 0x0;
META_TYPE_REPLICATION = 0x1;
META_TYPE_OWNER = 0x2;
META_TYPE_PERMS = 0x3;
META_TYPE_ACLS = 0x4;
META_TYPE_XATTRS = 0x5;
}
message CreateEventProto {
required INodeType type = 1;
required string path = 2;
required int64 ctime = 3;
required string ownerName = 4;
required string groupName = 5;
required FsPermissionProto perms = 6;
optional int32 replication = 7;
optional string symlinkTarget = 8;
}
message CloseEventProto {
required string path = 1;
required int64 fileSize = 2;
required int64 timestamp = 3;
}
message AppendEventProto {
required string path = 1;
}
message RenameEventProto {
required string srcPath = 1;
required string destPath = 2;
required int64 timestamp = 3;
}
message MetadataUpdateEventProto {
required string path = 1;
required MetadataUpdateType type = 2;
optional int64 mtime = 3;
optional int64 atime = 4;
optional int32 replication = 5;
optional string ownerName = 6;
optional string groupName = 7;
optional FsPermissionProto perms = 8;
repeated AclEntryProto acls = 9;
repeated XAttrProto xAttrs = 10;
optional bool xAttrsRemoved = 11;
}
message UnlinkEventProto {
required string path = 1;
required int64 timestamp = 2;
}
message EventsListProto {
repeated EventProto events = 1;
required int64 firstTxid = 2;
required int64 lastTxid = 3;
required int64 syncTxid = 4;
}

View File

@ -533,6 +533,28 @@
</description>
</property>
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.best-effort</name>
<value>false</value>
<description>
This property is used only if the value of
dfs.client.block.write.replace-datanode-on-failure.enable is true.
Best effort means that the client will try to replace a failed datanode
in write pipeline (provided that the policy is satisfied), however, it
continues the write operation in case that the datanode replacement also
fails.
Suppose the datanode replacement fails.
false: An exception should be thrown so that the write will fail.
true : The write should be resumed with the remaining datandoes.
Note that setting this property to true allows writing to a pipeline
with a smaller number of datanodes. As a result, it increases the
probability of data loss.
</description>
</property>
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>21600000</value>
@ -2076,4 +2098,14 @@
</description>
</property>
<property>
<name>dfs.namenode.inotify.max.events.per.rpc</name>
<value>1000</value>
<description>Maximum number of events that will be sent to an inotify client
in a single RPC response. The default value attempts to amortize away
the overhead for this RPC while avoiding huge memory requirements for the
client and NameNode (1000 events should consume no more than 1 MB.)
</description>
</property>
</configuration>

View File

@ -0,0 +1,430 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.util.ExitUtil;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TestDFSInotifyEventInputStream {
private static final int BLOCK_SIZE = 1024;
private static final Log LOG = LogFactory.getLog(
TestDFSInotifyEventInputStream.class);
private static Event waitForNextEvent(DFSInotifyEventInputStream eis)
throws IOException, MissingEventsException {
Event next = null;
while ((next = eis.poll()) == null);
return next;
}
/**
* If this test fails, check whether the newly added op should map to an
* inotify event, and if so, establish the mapping in
* {@link org.apache.hadoop.hdfs.server.namenode.InotifyFSEditLogOpTranslator}
* and update testBasic() to include the new op.
*/
@Test
public void testOpcodeCount() {
Assert.assertTrue(FSEditLogOpCodes.values().length == 46);
}
/**
* Tests all FsEditLogOps that are converted to inotify events.
*/
@Test(timeout = 120000)
@SuppressWarnings("deprecation")
public void testBasic() throws IOException, URISyntaxException,
InterruptedException, MissingEventsException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
// so that we can get an atime change
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 1);
MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
builder.getDfsBuilder().numDataNodes(2);
MiniQJMHACluster cluster = builder.build();
try {
cluster.getDfsCluster().waitActive();
cluster.getDfsCluster().transitionToActive(0);
DFSClient client = new DFSClient(cluster.getDfsCluster().getNameNode(0)
.getNameNodeAddress(), conf);
FileSystem fs = cluster.getDfsCluster().getFileSystem(0);
DFSTestUtil.createFile(fs, new Path("/file"), BLOCK_SIZE, (short) 1, 0L);
DFSTestUtil.createFile(fs, new Path("/file3"), BLOCK_SIZE, (short) 1, 0L);
DFSTestUtil.createFile(fs, new Path("/file5"), BLOCK_SIZE, (short) 1, 0L);
DFSInotifyEventInputStream eis = client.getInotifyEventStream();
client.rename("/file", "/file4", null); // RenameOp -> RenameEvent
client.rename("/file4", "/file2"); // RenameOldOp -> RenameEvent
// DeleteOp, AddOp -> UnlinkEvent, CreateEvent
OutputStream os = client.create("/file2", true, (short) 2, BLOCK_SIZE);
os.write(new byte[BLOCK_SIZE]);
os.close(); // CloseOp -> CloseEvent
// AddOp -> AppendEvent
os = client.append("/file2", BLOCK_SIZE, null, null);
os.write(new byte[BLOCK_SIZE]);
os.close(); // CloseOp -> CloseEvent
Thread.sleep(10); // so that the atime will get updated on the next line
client.open("/file2").read(new byte[1]); // TimesOp -> MetadataUpdateEvent
// SetReplicationOp -> MetadataUpdateEvent
client.setReplication("/file2", (short) 1);
// ConcatDeleteOp -> AppendEvent, UnlinkEvent, CloseEvent
client.concat("/file2", new String[]{"/file3"});
client.delete("/file2", false); // DeleteOp -> UnlinkEvent
client.mkdirs("/dir", null, false); // MkdirOp -> CreateEvent
// SetPermissionsOp -> MetadataUpdateEvent
client.setPermission("/dir", FsPermission.valueOf("-rw-rw-rw-"));
// SetOwnerOp -> MetadataUpdateEvent
client.setOwner("/dir", "username", "groupname");
client.createSymlink("/dir", "/dir2", false); // SymlinkOp -> CreateEvent
client.setXAttr("/file5", "user.field", "value".getBytes(), EnumSet.of(
XAttrSetFlag.CREATE)); // SetXAttrOp -> MetadataUpdateEvent
// RemoveXAttrOp -> MetadataUpdateEvent
client.removeXAttr("/file5", "user.field");
// SetAclOp -> MetadataUpdateEvent
client.setAcl("/file5", AclEntry.parseAclSpec(
"user::rwx,user:foo:rw-,group::r--,other::---", true));
client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent
Event next = null;
// RenameOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
Event.RenameEvent re = (Event.RenameEvent) next;
Assert.assertTrue(re.getDstPath().equals("/file4"));
Assert.assertTrue(re.getSrcPath().equals("/file"));
Assert.assertTrue(re.getTimestamp() > 0);
long eventsBehind = eis.getEventsBehindEstimate();
// RenameOldOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
Event.RenameEvent re2 = (Event.RenameEvent) next;
Assert.assertTrue(re2.getDstPath().equals("/file2"));
Assert.assertTrue(re2.getSrcPath().equals("/file4"));
Assert.assertTrue(re.getTimestamp() > 0);
// DeleteOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
Assert.assertTrue(((Event.UnlinkEvent) next).getPath().equals("/file2"));
// AddOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce = (Event.CreateEvent) next;
Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
Assert.assertTrue(ce.getPath().equals("/file2"));
Assert.assertTrue(ce.getCtime() > 0);
Assert.assertTrue(ce.getReplication() > 0);
Assert.assertTrue(ce.getSymlinkTarget() == null);
// CloseOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
Event.CloseEvent ce2 = (Event.CloseEvent) next;
Assert.assertTrue(ce2.getPath().equals("/file2"));
Assert.assertTrue(ce2.getFileSize() > 0);
Assert.assertTrue(ce2.getTimestamp() > 0);
// AddOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
// CloseOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2"));
// TimesOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next;
Assert.assertTrue(mue.getPath().equals("/file2"));
Assert.assertTrue(mue.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.TIMES);
// SetReplicationOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next;
Assert.assertTrue(mue2.getPath().equals("/file2"));
Assert.assertTrue(mue2.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.REPLICATION);
Assert.assertTrue(mue2.getReplication() == 1);
// ConcatDeleteOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next;
Assert.assertTrue(ue2.getPath().equals("/file3"));
Assert.assertTrue(ue2.getTimestamp() > 0);
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
Event.CloseEvent ce3 = (Event.CloseEvent) next;
Assert.assertTrue(ce3.getPath().equals("/file2"));
Assert.assertTrue(ce3.getTimestamp() > 0);
// DeleteOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
Event.UnlinkEvent ue = (Event.UnlinkEvent) next;
Assert.assertTrue(ue.getPath().equals("/file2"));
Assert.assertTrue(ue.getTimestamp() > 0);
// MkdirOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce4 = (Event.CreateEvent) next;
Assert.assertTrue(ce4.getiNodeType() ==
Event.CreateEvent.INodeType.DIRECTORY);
Assert.assertTrue(ce4.getPath().equals("/dir"));
Assert.assertTrue(ce4.getCtime() > 0);
Assert.assertTrue(ce4.getReplication() == 0);
Assert.assertTrue(ce4.getSymlinkTarget() == null);
// SetPermissionsOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next;
Assert.assertTrue(mue3.getPath().equals("/dir"));
Assert.assertTrue(mue3.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.PERMS);
Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-"));
// SetOwnerOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next;
Assert.assertTrue(mue4.getPath().equals("/dir"));
Assert.assertTrue(mue4.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.OWNER);
Assert.assertTrue(mue4.getOwnerName().equals("username"));
Assert.assertTrue(mue4.getGroupName().equals("groupname"));
// SymlinkOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce5 = (Event.CreateEvent) next;
Assert.assertTrue(ce5.getiNodeType() ==
Event.CreateEvent.INodeType.SYMLINK);
Assert.assertTrue(ce5.getPath().equals("/dir2"));
Assert.assertTrue(ce5.getCtime() > 0);
Assert.assertTrue(ce5.getReplication() == 0);
Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir"));
// SetXAttrOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next;
Assert.assertTrue(mue5.getPath().equals("/file5"));
Assert.assertTrue(mue5.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS);
Assert.assertTrue(mue5.getxAttrs().size() == 1);
Assert.assertTrue(mue5.getxAttrs().get(0).getName().contains("field"));
Assert.assertTrue(!mue5.isxAttrsRemoved());
// RemoveXAttrOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next;
Assert.assertTrue(mue6.getPath().equals("/file5"));
Assert.assertTrue(mue6.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS);
Assert.assertTrue(mue6.getxAttrs().size() == 1);
Assert.assertTrue(mue6.getxAttrs().get(0).getName().contains("field"));
Assert.assertTrue(mue6.isxAttrsRemoved());
// SetAclOp (1)
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next;
Assert.assertTrue(mue7.getPath().equals("/file5"));
Assert.assertTrue(mue7.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS);
Assert.assertTrue(mue7.getAcls().contains(
AclEntry.parseAclEntry("user::rwx", true)));
// SetAclOp (2)
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next;
Assert.assertTrue(mue8.getPath().equals("/file5"));
Assert.assertTrue(mue8.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS);
Assert.assertTrue(mue8.getAcls() == null);
// Returns null when there are no further events
Assert.assertTrue(eis.poll() == null);
// make sure the estimate hasn't changed since the above assertion
// tells us that we are fully caught up to the current namesystem state
// and we should not have been behind at all when eventsBehind was set
// either, since there were few enough events that they should have all
// been read to the client during the first poll() call
Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
MissingEventsException {
Configuration conf = new HdfsConfiguration();
MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
try {
cluster.getDfsCluster().waitActive();
cluster.getDfsCluster().transitionToActive(0);
DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
(cluster.getDfsCluster(), conf)).dfs;
DFSInotifyEventInputStream eis = client.getInotifyEventStream();
for (int i = 0; i < 10; i++) {
client.mkdirs("/dir" + i, null, false);
}
cluster.getDfsCluster().shutdownNameNode(0);
cluster.getDfsCluster().transitionToActive(1);
Event next = null;
// we can read all of the edits logged by the old active from the new
// active
for (int i = 0; i < 10; i++) {
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
i));
}
Assert.assertTrue(eis.poll() == null);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
Configuration conf = new HdfsConfiguration();
MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
try {
cluster.getDfsCluster().waitActive();
cluster.getDfsCluster().transitionToActive(0);
DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
.getNameNodeAddress(), conf);
DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
.getNameNodeAddress(), conf);
DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
for (int i = 0; i < 10; i++) {
client0.mkdirs("/dir" + i, null, false);
}
cluster.getDfsCluster().transitionToActive(1);
for (int i = 10; i < 20; i++) {
client1.mkdirs("/dir" + i, null, false);
}
// make sure that the old active can't read any further than the edits
// it logged itself (it has no idea whether the in-progress edits from
// the other writer have actually been committed)
Event next = null;
for (int i = 0; i < 10; i++) {
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
i));
}
Assert.assertTrue(eis.poll() == null);
} finally {
try {
cluster.shutdown();
} catch (ExitUtil.ExitException e) {
// expected because the old active will be unable to flush the
// end-of-segment op since it is fenced
}
}
}
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
InterruptedException, MissingEventsException {
Configuration conf = new HdfsConfiguration();
MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
try {
cluster.getDfsCluster().waitActive();
cluster.getDfsCluster().transitionToActive(0);
final DFSClient client = new DFSClient(cluster.getDfsCluster()
.getNameNode(0).getNameNodeAddress(), conf);
DFSInotifyEventInputStream eis = client.getInotifyEventStream();
ScheduledExecutorService ex = Executors
.newSingleThreadScheduledExecutor();
ex.schedule(new Runnable() {
@Override
public void run() {
try {
client.mkdirs("/dir", null, false);
} catch (IOException e) {
// test will fail
LOG.error("Unable to create /dir", e);
}
}
}, 1, TimeUnit.SECONDS);
// a very generous wait period -- the edit will definitely have been
// processed by the time this is up
Event next = eis.poll(5, TimeUnit.SECONDS);
Assert.assertTrue(next != null);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir"));
} finally {
cluster.shutdown();
}
}
}

View File

@ -125,7 +125,7 @@ private void recoverFile(final FileSystem fs) throws Exception {
while (!recovered && tries-- > 0) {
try {
out = fs.append(file1);
LOG.info("Successfully opened for appends");
LOG.info("Successfully opened for append");
recovered = true;
} catch (IOException e) {
LOG.info("Failed open for append, waiting on lease recovery");

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
@ -54,7 +55,8 @@ public class TestReplaceDatanodeOnFailure {
/** Test DEFAULT ReplaceDatanodeOnFailure policy. */
@Test
public void testDefaultPolicy() throws Exception {
final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.DEFAULT;
final Configuration conf = new HdfsConfiguration();
final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.get(conf);
final DatanodeInfo[] infos = new DatanodeInfo[5];
final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][];
@ -113,7 +115,7 @@ public void testReplaceDatanodeOnFailure() throws Exception {
final Configuration conf = new HdfsConfiguration();
//always replace a datanode
ReplaceDatanodeOnFailure.ALWAYS.write(conf);
ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
final String[] racks = new String[REPLICATION];
Arrays.fill(racks, RACK0);
@ -239,8 +241,6 @@ public void testAppend() throws Exception {
final Configuration conf = new HdfsConfiguration();
final short REPLICATION = (short)3;
Assert.assertEquals(ReplaceDatanodeOnFailure.DEFAULT, ReplaceDatanodeOnFailure.get(conf));
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
).numDataNodes(1).build();
@ -285,4 +285,41 @@ public void testAppend() throws Exception {
if (cluster != null) {cluster.shutdown();}
}
}
@Test
public void testBestEffort() throws Exception {
final Configuration conf = new HdfsConfiguration();
//always replace a datanode but do not throw exception
ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
).numDataNodes(1).build();
try {
final DistributedFileSystem fs = cluster.getFileSystem();
final Path f = new Path(DIR, "testIgnoreReplaceFailure");
final byte[] bytes = new byte[1000];
{
LOG.info("write " + bytes.length + " bytes to " + f);
final FSDataOutputStream out = fs.create(f, REPLICATION);
out.write(bytes);
out.close();
final FileStatus status = fs.getFileStatus(f);
Assert.assertEquals(REPLICATION, status.getReplication());
Assert.assertEquals(bytes.length, status.getLen());
}
{
LOG.info("append another " + bytes.length + " bytes to " + f);
final FSDataOutputStream out = fs.append(f);
out.write(bytes);
out.close();
}
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
}

View File

@ -56,7 +56,9 @@ public static class Builder {
public Builder(Configuration conf) {
this.conf = conf;
this.dfsBuilder = new MiniDFSCluster.Builder(conf);
// most QJMHACluster tests don't need DataNodes, so we'll make
// this the default
this.dfsBuilder = new MiniDFSCluster.Builder(conf).numDataNodes(0);
}
public MiniDFSCluster.Builder getDfsBuilder() {
@ -102,7 +104,7 @@ private MiniQJMHACluster(Builder builder) throws IOException {
cluster = builder.dfsBuilder.nnTopology(topology)
.manageNameDfsSharedDirs(false).build();
cluster.waitActive();
cluster.shutdown();
cluster.shutdownNameNodes();
// initialize the journal nodes
Configuration confNN0 = cluster.getConfiguration(0);

View File

@ -382,7 +382,7 @@ public void afterCall(InvocationOnMock invocation, boolean succeeded) {
}
@Override
protected ExecutorService createExecutor() {
protected ExecutorService createSingleThreadExecutor() {
return MoreExecutors.sameThreadExecutor();
}
}

View File

@ -939,7 +939,7 @@ private QuorumJournalManager createSpyingQJM()
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
protected ExecutorService createExecutor() {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return MoreExecutors.sameThreadExecutor();

View File

@ -916,6 +916,10 @@ public boolean isInProgress() {
public void setMaxOpSize(int maxOpSize) {
reader.setMaxOpSize(maxOpSize);
}
@Override public boolean isLocalLog() {
return true;
}
}
@Test

View File

@ -441,7 +441,7 @@ public void testCompression() throws IOException {
checkNameSpace(conf);
// read an image compressed in Gzip and store it uncompressed
LOG.info("Read an compressed iamge and store it as uncompressed.");
LOG.info("Read a compressed image and store it as uncompressed.");
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
checkNameSpace(conf);

View File

@ -572,6 +572,11 @@
<artifactId>jets3t</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>

View File

@ -0,0 +1,19 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
</FindBugsFilter>

View File

@ -0,0 +1,116 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<artifactId>hadoop-aws</artifactId>
<version>3.0.0-SNAPSHOT</version>
<name>Apache Hadoop Amazon Web Services support</name>
<description>
This module contains code to support integration with Amazon Web Services.
It also declares the dependencies needed to work with AWS services.
</description>
<packaging>jar</packaging>
<properties>
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
</properties>
<profiles>
<profile>
<id>tests-off</id>
<activation>
<file>
<missing>src/test/resources/auth-keys.xml</missing>
</file>
</activation>
<properties>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>tests-on</id>
<activation>
<file>
<exists>src/test/resources/auth-keys.xml</exists>
</file>
</activation>
<properties>
<maven.test.skip>false</maven.test.skip>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput>
<excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
</excludeFilterFile>
<effort>Max</effort>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<configuration>
<dependencyDetailsEnabled>false</dependencyDetailsEnabled>
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -43,6 +43,7 @@
<module>hadoop-openstack</module>
<module>hadoop-sls</module>
<module>hadoop-azure</module>
<module>hadoop-aws</module>
</modules>
<build>