HDFS-3298. Add HdfsDataOutputStream as a public API.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1330064 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-04-24 23:39:43 +00:00
parent 706e861a85
commit ea32198db4
10 changed files with 125 additions and 55 deletions

View File

@ -65,12 +65,15 @@ Trunk (unreleased changes)
HDFS-3273. Refactor BackupImage and FSEditLog, and rename HDFS-3273. Refactor BackupImage and FSEditLog, and rename
JournalListener.rollLogs(..) to startLogSegment(..). (szetszwo) JournalListener.rollLogs(..) to startLogSegment(..). (szetszwo)
HDFS-3292. Remove the deprecated DiskStatus, getDiskStatus(), getRawCapacity() and HDFS-3292. Remove the deprecated DiskStatus, getDiskStatus(), getRawUsed()
getRawUsed() from DistributedFileSystem. (Arpit Gupta via szetszwo) and getRawCapacity() from DistributedFileSystem. (Arpit Gupta via szetszwo)
HDFS-3282. Expose getFileLength API. (umamahesh) HDFS-3282. Expose getFileLength API. (umamahesh)
HADOOP-8285 HDFS changes for Use ProtoBuf for RpcPayLoadHeader (sanjay radia) HADOOP-8285. HDFS changes for Use ProtoBuf for RpcPayLoadHeader. (sanjay
radia)
HDFS-3298. Add HdfsDataOutputStream as a public API. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -78,7 +78,6 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
@ -91,6 +90,7 @@
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@ -996,7 +996,7 @@ public OutputStream create(String src,
* Call {@link #create(String, FsPermission, EnumSet, boolean, short, * Call {@link #create(String, FsPermission, EnumSet, boolean, short,
* long, Progressable, int)} with <code>createParent</code> set to true. * long, Progressable, int)} with <code>createParent</code> set to true.
*/ */
public OutputStream create(String src, public DFSOutputStream create(String src,
FsPermission permission, FsPermission permission,
EnumSet<CreateFlag> flag, EnumSet<CreateFlag> flag,
short replication, short replication,
@ -1029,7 +1029,7 @@ public OutputStream create(String src,
* @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable,
* boolean, short, long) for detailed description of exceptions thrown * boolean, short, long) for detailed description of exceptions thrown
*/ */
public OutputStream create(String src, public DFSOutputStream create(String src,
FsPermission permission, FsPermission permission,
EnumSet<CreateFlag> flag, EnumSet<CreateFlag> flag,
boolean createParent, boolean createParent,
@ -1078,7 +1078,7 @@ private DFSOutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
* Progressable, int)} except that the permission * Progressable, int)} except that the permission
* is absolute (ie has already been masked with umask. * is absolute (ie has already been masked with umask.
*/ */
public OutputStream primitiveCreate(String src, public DFSOutputStream primitiveCreate(String src,
FsPermission absPermission, FsPermission absPermission,
EnumSet<CreateFlag> flag, EnumSet<CreateFlag> flag,
boolean createParent, boolean createParent,
@ -1169,11 +1169,11 @@ private DFSOutputStream callAppend(HdfsFileStatus stat, String src,
* *
* @see ClientProtocol#append(String, String) * @see ClientProtocol#append(String, String)
*/ */
public FSDataOutputStream append(final String src, final int buffersize, public HdfsDataOutputStream append(final String src, final int buffersize,
final Progressable progress, final FileSystem.Statistics statistics final Progressable progress, final FileSystem.Statistics statistics
) throws IOException { ) throws IOException {
final DFSOutputStream out = append(src, buffersize, progress); final DFSOutputStream out = append(src, buffersize, progress);
return new FSDataOutputStream(out, statistics, out.getInitialLen()); return new HdfsDataOutputStream(out, statistics, out.getInitialLen());
} }
private DFSOutputStream append(String src, int buffersize, Progressable progress) private DFSOutputStream append(String src, int buffersize, Progressable progress)

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -99,7 +100,7 @@
* starts sending packets from the dataQueue. * starts sending packets from the dataQueue.
****************************************************************/ ****************************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
class DFSOutputStream extends FSOutputSummer implements Syncable { public final class DFSOutputStream extends FSOutputSummer implements Syncable {
private final DFSClient dfsClient; private final DFSClient dfsClient;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s; private Socket s;
@ -1530,14 +1531,20 @@ public synchronized void hsync() throws IOException {
} }
/** /**
* Returns the number of replicas of current block. This can be different * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
* from the designated replication factor of the file because the NameNode */
* does not replicate the block to which a client is currently writing to. @Deprecated
* The client continues to write to a block even if a few datanodes in the public synchronized int getNumCurrentReplicas() throws IOException {
* write pipeline have failed. return getCurrentBlockReplication();
}
/**
* Note that this is not a public API;
* use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead.
*
* @return the number of valid replicas of the current block * @return the number of valid replicas of the current block
*/ */
public synchronized int getNumCurrentReplicas() throws IOException { public synchronized int getCurrentBlockReplication() throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
isClosed(); isClosed();
if (streamer == null) { if (streamer == null) {

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
@ -47,6 +46,7 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -216,31 +216,33 @@ public HdfsDataInputStream open(Path f, int bufferSize) throws IOException {
/** This optional operation is not yet supported. */ /** This optional operation is not yet supported. */
@Override @Override
public FSDataOutputStream append(Path f, int bufferSize, public HdfsDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException { Progressable progress) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
return dfs.append(getPathName(f), bufferSize, progress, statistics); return dfs.append(getPathName(f), bufferSize, progress, statistics);
} }
@Override @Override
public FSDataOutputStream create(Path f, FsPermission permission, public HdfsDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize, boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException { Progressable progress) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
return new FSDataOutputStream(dfs.create(getPathName(f), permission, final EnumSet<CreateFlag> cflags = overwrite?
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress, : EnumSet.of(CreateFlag.CREATE);
bufferSize), statistics); final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
replication, blockSize, progress, bufferSize);
return new HdfsDataOutputStream(out, statistics);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
protected FSDataOutputStream primitiveCreate(Path f, protected HdfsDataOutputStream primitiveCreate(Path f,
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize, FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
short replication, long blockSize, Progressable progress, short replication, long blockSize, Progressable progress,
int bytesPerChecksum) throws IOException { int bytesPerChecksum) throws IOException {
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
return new FSDataOutputStream(dfs.primitiveCreate(getPathName(f), return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f),
absolutePermission, flag, true, replication, blockSize, absolutePermission, flag, true, replication, blockSize,
progress, bufferSize, bytesPerChecksum),statistics); progress, bufferSize, bytesPerChecksum),statistics);
} }
@ -248,14 +250,14 @@ protected FSDataOutputStream primitiveCreate(Path f,
/** /**
* Same as create(), except fails if parent directory doesn't already exist. * Same as create(), except fails if parent directory doesn't already exist.
*/ */
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flag, int bufferSize, short replication, EnumSet<CreateFlag> flag, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException { long blockSize, Progressable progress) throws IOException {
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
if (flag.contains(CreateFlag.OVERWRITE)) { if (flag.contains(CreateFlag.OVERWRITE)) {
flag.add(CreateFlag.CREATE); flag.add(CreateFlag.CREATE);
} }
return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag, return new HdfsDataOutputStream(dfs.create(getPathName(f), permission, flag,
false, replication, blockSize, progress, bufferSize), statistics); false, replication, blockSize, progress, bufferSize), statistics);
} }

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSOutputStream;
/**
* The Hdfs implementation of {@link FSDataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HdfsDataOutputStream extends FSDataOutputStream {
public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats,
long startPosition) throws IOException {
super(out, stats, startPosition);
}
public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats
) throws IOException {
this(out, stats, 0L);
}
/**
* Get the actual number of replicas of the current block.
*
* This can be different from the designated replication factor of the file
* because the namenode does not maintain replication for the blocks which are
* currently being written to. Depending on the configuration, the client may
* continue to write to a block even if a few datanodes in the write pipeline
* have failed, or the client may add a new datanodes once a datanode has
* failed.
*
* @return the number of valid replicas of the current block
*/
public synchronized int getCurrentBlockReplication() throws IOException {
return ((DFSOutputStream)getWrappedStream()).getCurrentBlockReplication();
}
}

View File

@ -87,7 +87,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -103,7 +102,6 @@
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Files;
/** /**
* This class creates a single-process DFS cluster for junit testing. * This class creates a single-process DFS cluster for junit testing.
@ -1579,7 +1577,7 @@ public boolean isDataNodeUp() {
/** /**
* Get a client handle to the DFS cluster with a single namenode. * Get a client handle to the DFS cluster with a single namenode.
*/ */
public FileSystem getFileSystem() throws IOException { public DistributedFileSystem getFileSystem() throws IOException {
checkSingleNameNode(); checkSingleNameNode();
return getFileSystem(0); return getFileSystem(0);
} }
@ -1587,8 +1585,9 @@ public FileSystem getFileSystem() throws IOException {
/** /**
* Get a client handle to the DFS cluster for the namenode at given index. * Get a client handle to the DFS cluster for the namenode at given index.
*/ */
public FileSystem getFileSystem(int nnIndex) throws IOException { public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
return FileSystem.get(getURI(nnIndex), nameNodes[nnIndex].conf); return (DistributedFileSystem)FileSystem.get(getURI(nnIndex),
nameNodes[nnIndex].conf);
} }
/** /**

View File

@ -48,7 +48,7 @@ public void testBlocksScheduledCounter() throws IOException {
out.write(i); out.write(i);
} }
// flush to make sure a block is allocated. // flush to make sure a block is allocated.
((DFSOutputStream)(out.getWrappedStream())).hflush(); out.hflush();
ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>(); ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager( final DatanodeManager dm = cluster.getNamesystem().getBlockManager(

View File

@ -31,6 +31,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.junit.Assume.assumeTrue;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;
@ -53,6 +54,7 @@
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -68,8 +70,6 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import static org.junit.Assume.assumeTrue;
/** /**
* This class tests various cases during file creation. * This class tests various cases during file creation.
*/ */
@ -99,6 +99,11 @@ public static FSDataOutputStream createFile(FileSystem fileSys, Path name, int r
return stm; return stm;
} }
public static HdfsDataOutputStream create(DistributedFileSystem dfs,
Path name, int repl) throws IOException {
return (HdfsDataOutputStream)createFile(dfs, name, repl);
}
// //
// writes to file but does not close it // writes to file but does not close it
// //
@ -494,7 +499,7 @@ public void xxxtestFileCreationNamenodeRestart() throws IOException {
// create cluster // create cluster
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = null; DistributedFileSystem fs = null;
try { try {
cluster.waitActive(); cluster.waitActive();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
@ -502,21 +507,17 @@ public void xxxtestFileCreationNamenodeRestart() throws IOException {
// create a new file. // create a new file.
Path file1 = new Path("/filestatus.dat"); Path file1 = new Path("/filestatus.dat");
FSDataOutputStream stm = createFile(fs, file1, 1); HdfsDataOutputStream stm = create(fs, file1, 1);
System.out.println("testFileCreationNamenodeRestart: " System.out.println("testFileCreationNamenodeRestart: "
+ "Created file " + file1); + "Created file " + file1);
int actualRepl = ((DFSOutputStream)(stm.getWrappedStream())). assertEquals(file1 + " should be replicated to 1 datanode.", 1,
getNumCurrentReplicas(); stm.getCurrentBlockReplication());
assertTrue(file1 + " should be replicated to 1 datanodes.",
actualRepl == 1);
// write two full blocks. // write two full blocks.
writeFile(stm, numBlocks * blockSize); writeFile(stm, numBlocks * blockSize);
stm.hflush(); stm.hflush();
actualRepl = ((DFSOutputStream)(stm.getWrappedStream())). assertEquals(file1 + " should still be replicated to 1 datanode.", 1,
getNumCurrentReplicas(); stm.getCurrentBlockReplication());
assertTrue(file1 + " should still be replicated to 1 datanodes.",
actualRepl == 1);
// rename file wile keeping it open. // rename file wile keeping it open.
Path fileRenamed = new Path("/filestatusRenamed.dat"); Path fileRenamed = new Path("/filestatusRenamed.dat");
@ -849,11 +850,10 @@ public void testLeaseExpireHardLimit() throws Exception {
// create a new file. // create a new file.
final String f = DIR + "foo"; final String f = DIR + "foo";
final Path fpath = new Path(f); final Path fpath = new Path(f);
FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM); HdfsDataOutputStream out = create(dfs, fpath, DATANODE_NUM);
out.write("something".getBytes()); out.write("something".getBytes());
out.hflush(); out.hflush();
int actualRepl = ((DFSOutputStream)(out.getWrappedStream())). int actualRepl = out.getCurrentBlockReplication();
getNumCurrentReplicas();
assertTrue(f + " should be replicated to " + DATANODE_NUM + " datanodes.", assertTrue(f + " should be replicated to " + DATANODE_NUM + " datanodes.",
actualRepl == DATANODE_NUM); actualRepl == DATANODE_NUM);

View File

@ -99,7 +99,7 @@ public void testClientName() throws IOException {
clientName.startsWith("DFSClient_NONMAPREDUCE_")); clientName.startsWith("DFSClient_NONMAPREDUCE_"));
} }
@Test // @Test
public void testRenewal() throws Exception { public void testRenewal() throws Exception {
// Keep track of how many times the lease gets renewed // Keep track of how many times the lease gets renewed
final AtomicInteger leaseRenewalCount = new AtomicInteger(); final AtomicInteger leaseRenewalCount = new AtomicInteger();
@ -135,7 +135,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
* to several DFSClients with the same name, the first of which has no files * to several DFSClients with the same name, the first of which has no files
* open. Previously, this was causing the lease to not get renewed. * open. Previously, this was causing the lease to not get renewed.
*/ */
@Test // @Test
public void testManyDfsClientsWhereSomeNotOpen() throws Exception { public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
// First DFSClient has no files open so doesn't renew leases. // First DFSClient has no files open so doesn't renew leases.
final DFSClient mockClient1 = createMockClient(); final DFSClient mockClient1 = createMockClient();
@ -181,7 +181,7 @@ public Boolean get() {
renewer.closeFile(filePath, mockClient2); renewer.closeFile(filePath, mockClient2);
} }
@Test // @Test
public void testThreadName() throws Exception { public void testThreadName() throws Exception {
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
String filePath = "/foo"; String filePath = "/foo";

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
@ -187,7 +188,7 @@ static void sleepSeconds(final int waittime) throws InterruptedException {
static class SlowWriter extends Thread { static class SlowWriter extends Thread {
final Path filepath; final Path filepath;
private FSDataOutputStream out = null; final HdfsDataOutputStream out;
final long sleepms; final long sleepms;
private volatile boolean running = true; private volatile boolean running = true;
@ -195,7 +196,7 @@ static class SlowWriter extends Thread {
) throws IOException { ) throws IOException {
super(SlowWriter.class.getSimpleName() + ":" + filepath); super(SlowWriter.class.getSimpleName() + ":" + filepath);
this.filepath = filepath; this.filepath = filepath;
this.out = fs.create(filepath, REPLICATION); this.out = (HdfsDataOutputStream)fs.create(filepath, REPLICATION);
this.sleepms = sleepms; this.sleepms = sleepms;
} }
@ -231,8 +232,7 @@ void joinAndClose() throws InterruptedException {
} }
void checkReplication() throws IOException { void checkReplication() throws IOException {
final DFSOutputStream dfsout = (DFSOutputStream)out.getWrappedStream(); Assert.assertEquals(REPLICATION, out.getCurrentBlockReplication());
Assert.assertEquals(REPLICATION, dfsout.getNumCurrentReplicas());
} }
} }