HDFS-3088. Move FSDatasetInterface inner classes to a package.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1301661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
af3163d1d1
commit
662b1887af
@ -234,6 +234,8 @@ Release 0.23.3 - UNRELEASED
|
||||
HDFS-3057. httpfs and hdfs launcher scripts should honor CATALINA_HOME
|
||||
and HADOOP_LIBEXEC_DIR (rvs via tucu)
|
||||
|
||||
HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
|
||||
|
@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
@ -72,7 +72,7 @@ class BlockPoolSliceScanner {
|
||||
private final AtomicLong lastScanTime = new AtomicLong();
|
||||
|
||||
private final DataNode datanode;
|
||||
private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
|
||||
private final FSDatasetInterface<? extends FsVolumeSpi> dataset;
|
||||
|
||||
private final SortedSet<BlockScanInfo> blockInfoSet
|
||||
= new TreeSet<BlockScanInfo>();
|
||||
@ -134,7 +134,7 @@ class BlockPoolSliceScanner {
|
||||
}
|
||||
|
||||
BlockPoolSliceScanner(String bpid, DataNode datanode,
|
||||
FSDatasetInterface<? extends FSVolumeInterface> dataset,
|
||||
FSDatasetInterface<? extends FsVolumeSpi> dataset,
|
||||
Configuration conf) {
|
||||
this.datanode = datanode;
|
||||
this.dataset = dataset;
|
||||
|
@ -38,12 +38,12 @@ import org.apache.hadoop.fs.FSOutputSummer;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
@ -86,7 +86,7 @@ class BlockReceiver implements Closeable {
|
||||
private DataOutputStream mirrorOut;
|
||||
private Daemon responder = null;
|
||||
private DataTransferThrottler throttler;
|
||||
private FSDataset.BlockWriteStreams streams;
|
||||
private ReplicaOutputStreams streams;
|
||||
private DatanodeInfo srcDataNode = null;
|
||||
private Checksum partialCrc = null;
|
||||
private final DataNode datanode;
|
||||
@ -202,16 +202,16 @@ class BlockReceiver implements Closeable {
|
||||
this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
|
||||
this.checksumSize = diskChecksum.getChecksumSize();
|
||||
|
||||
this.out = streams.dataOut;
|
||||
this.out = streams.getDataOut();
|
||||
if (out instanceof FileOutputStream) {
|
||||
this.outFd = ((FileOutputStream)out).getFD();
|
||||
} else {
|
||||
LOG.warn("Could not get file descriptor for outputstream of class " +
|
||||
out.getClass());
|
||||
}
|
||||
this.cout = streams.checksumOut;
|
||||
this.cout = streams.getChecksumOut();
|
||||
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
|
||||
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
cout, HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
// write data chunk header if creating a new replica
|
||||
if (isCreate) {
|
||||
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
|
||||
@ -856,13 +856,13 @@ class BlockReceiver implements Closeable {
|
||||
//
|
||||
byte[] buf = new byte[sizePartialChunk];
|
||||
byte[] crcbuf = new byte[checksumSize];
|
||||
FSDataset.BlockInputStreams instr = null;
|
||||
ReplicaInputStreams instr = null;
|
||||
try {
|
||||
instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
|
||||
IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
|
||||
IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
|
||||
|
||||
// open meta file and read in crc value computer earlier
|
||||
IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
|
||||
IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
|
||||
} finally {
|
||||
IOUtils.closeStream(instr);
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
/**************************************************
|
||||
* BlockVolumeChoosingPolicy allows a DataNode to
|
||||
@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterfa
|
||||
*
|
||||
***************************************************/
|
||||
@InterfaceAudience.Private
|
||||
public interface BlockVolumeChoosingPolicy<V extends FSVolumeInterface> {
|
||||
public interface BlockVolumeChoosingPolicy<V extends FsVolumeSpi> {
|
||||
|
||||
/**
|
||||
* Returns a specific FSVolume after applying a suitable choice algorithm
|
||||
|
@ -31,7 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
/**
|
||||
* DataBlockScanner manages block scanning for all the block pools. For each
|
||||
@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterfa
|
||||
public class DataBlockScanner implements Runnable {
|
||||
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
|
||||
private final DataNode datanode;
|
||||
private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
|
||||
private final FSDatasetInterface<? extends FsVolumeSpi> dataset;
|
||||
private final Configuration conf;
|
||||
|
||||
/**
|
||||
@ -55,7 +55,7 @@ public class DataBlockScanner implements Runnable {
|
||||
Thread blockScannerThread = null;
|
||||
|
||||
DataBlockScanner(DataNode datanode,
|
||||
FSDatasetInterface<? extends FSVolumeInterface> dataset,
|
||||
FSDatasetInterface<? extends FsVolumeSpi> dataset,
|
||||
Configuration conf) {
|
||||
this.datanode = datanode;
|
||||
this.dataset = dataset;
|
||||
|
@ -121,8 +121,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
|
||||
@ -231,7 +231,7 @@ public class DataNode extends Configured
|
||||
|
||||
volatile boolean shouldRun = true;
|
||||
private BlockPoolManager blockPoolManager;
|
||||
volatile FSDatasetInterface<? extends FSVolumeInterface> data = null;
|
||||
volatile FSDatasetInterface<? extends FsVolumeSpi> data = null;
|
||||
private String clusterId = null;
|
||||
|
||||
public final static String EMPTY_DEL_HINT = "";
|
||||
|
@ -55,7 +55,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
@ -511,7 +511,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||
checkAccess(out, true, block, blockToken,
|
||||
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
|
||||
updateCurrentThreadName("Reading metadata for block " + block);
|
||||
final MetaDataInputStream metadataIn =
|
||||
final LengthInputStream metadataIn =
|
||||
datanode.data.getMetaDataInputStream(block);
|
||||
final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
|
||||
metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
|
||||
/**
|
||||
@ -157,13 +157,13 @@ public class DirectoryScanner implements Runnable {
|
||||
private final long blockId;
|
||||
private final File metaFile;
|
||||
private final File blockFile;
|
||||
private final FSVolumeInterface volume;
|
||||
private final FsVolumeSpi volume;
|
||||
|
||||
ScanInfo(long blockId) {
|
||||
this(blockId, null, null, null);
|
||||
}
|
||||
|
||||
ScanInfo(long blockId, File blockFile, File metaFile, FSVolumeInterface vol) {
|
||||
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
|
||||
this.blockId = blockId;
|
||||
this.metaFile = metaFile;
|
||||
this.blockFile = blockFile;
|
||||
@ -182,7 +182,7 @@ public class DirectoryScanner implements Runnable {
|
||||
return blockId;
|
||||
}
|
||||
|
||||
FSVolumeInterface getVolume() {
|
||||
FsVolumeSpi getVolume() {
|
||||
return volume;
|
||||
}
|
||||
|
||||
@ -412,8 +412,8 @@ public class DirectoryScanner implements Runnable {
|
||||
|
||||
/** Is the given volume still valid in the dataset? */
|
||||
private static boolean isValid(final FSDatasetInterface<?> dataset,
|
||||
final FSVolumeInterface volume) {
|
||||
for (FSVolumeInterface vol : dataset.getVolumes()) {
|
||||
final FsVolumeSpi volume) {
|
||||
for (FsVolumeSpi vol : dataset.getVolumes()) {
|
||||
if (vol == volume) {
|
||||
return true;
|
||||
}
|
||||
@ -424,7 +424,7 @@ public class DirectoryScanner implements Runnable {
|
||||
/** Get lists of blocks on the disk sorted by blockId, per blockpool */
|
||||
private Map<String, ScanInfo[]> getDiskReport() {
|
||||
// First get list of data directories
|
||||
final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
|
||||
final List<? extends FsVolumeSpi> volumes = dataset.getVolumes();
|
||||
ArrayList<ScanInfoPerBlockPool> dirReports =
|
||||
new ArrayList<ScanInfoPerBlockPool>(volumes.size());
|
||||
|
||||
@ -473,9 +473,9 @@ public class DirectoryScanner implements Runnable {
|
||||
|
||||
private static class ReportCompiler
|
||||
implements Callable<ScanInfoPerBlockPool> {
|
||||
private FSVolumeInterface volume;
|
||||
private FsVolumeSpi volume;
|
||||
|
||||
public ReportCompiler(FSVolumeInterface volume) {
|
||||
public ReportCompiler(FsVolumeSpi volume) {
|
||||
this.volume = volume;
|
||||
}
|
||||
|
||||
@ -492,7 +492,7 @@ public class DirectoryScanner implements Runnable {
|
||||
}
|
||||
|
||||
/** Compile list {@link ScanInfo} for the blocks in the directory <dir> */
|
||||
private LinkedList<ScanInfo> compileReport(FSVolumeInterface vol, File dir,
|
||||
private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol, File dir,
|
||||
LinkedList<ScanInfo> report) {
|
||||
File[] files;
|
||||
try {
|
||||
|
@ -61,7 +61,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
@ -548,7 +551,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||
*
|
||||
* It uses the {@link FSDataset} object for synchronization.
|
||||
*/
|
||||
static class FSVolume implements FSVolumeInterface {
|
||||
static class FSVolume implements FsVolumeSpi {
|
||||
private final FSDataset dataset;
|
||||
private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
|
||||
private final File currentDir; // <StorageDirectory>/current
|
||||
@ -865,7 +868,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||
|
||||
private long getRemaining() throws IOException {
|
||||
long remaining = 0L;
|
||||
for (FSVolumeInterface vol : volumes) {
|
||||
for (FsVolumeSpi vol : volumes) {
|
||||
remaining += vol.getAvailable();
|
||||
}
|
||||
return remaining;
|
||||
@ -1052,13 +1055,13 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||
}
|
||||
|
||||
@Override // FSDatasetInterface
|
||||
public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
|
||||
public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
||||
throws IOException {
|
||||
final File meta = getMetaFile(b);
|
||||
if (meta == null || !meta.exists()) {
|
||||
return null;
|
||||
}
|
||||
return new MetaDataInputStream(new FileInputStream(meta), meta.length());
|
||||
return new LengthInputStream(new FileInputStream(meta), meta.length());
|
||||
}
|
||||
|
||||
private final DataNode datanode;
|
||||
@ -1287,7 +1290,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||
* Returns handles to the block file and its metadata file
|
||||
*/
|
||||
@Override // FSDatasetInterface
|
||||
public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
||||
long blkOffset, long ckoff) throws IOException {
|
||||
ReplicaInfo info = getReplicaInfo(b);
|
||||
File blockFile = info.getBlockFile();
|
||||
@ -1300,7 +1303,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||
if (ckoff > 0) {
|
||||
metaInFile.seek(ckoff);
|
||||
}
|
||||
return new BlockInputStreams(new FileInputStream(blockInFile.getFD()),
|
||||
return new ReplicaInputStreams(new FileInputStream(blockInFile.getFD()),
|
||||
new FileInputStream(metaInFile.getFD()));
|
||||
}
|
||||
|
||||
@ -1742,9 +1745,9 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||
* last checksum will be overwritten.
|
||||
*/
|
||||
@Override // FSDatasetInterface
|
||||
public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams streams,
|
||||
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
|
||||
int checksumSize) throws IOException {
|
||||
FileOutputStream file = (FileOutputStream) streams.checksumOut;
|
||||
FileOutputStream file = (FileOutputStream) streams.getChecksumOut();
|
||||
FileChannel channel = file.getChannel();
|
||||
long oldPos = channel.position();
|
||||
long newPos = oldPos - checksumSize;
|
||||
@ -2195,7 +2198,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||
*/
|
||||
@Override
|
||||
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
File diskMetaFile, FSVolumeInterface vol) {
|
||||
File diskMetaFile, FsVolumeSpi vol) {
|
||||
Block corruptBlock = null;
|
||||
ReplicaInfo memBlockInfo;
|
||||
synchronized (this) {
|
||||
|
@ -18,12 +18,9 @@
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -34,11 +31,13 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
@ -50,7 +49,7 @@ import org.apache.hadoop.util.ReflectionUtils;
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterface>
|
||||
public interface FSDatasetInterface<V extends FsVolumeSpi>
|
||||
extends FSDatasetMBean {
|
||||
/**
|
||||
* A factory for creating FSDatasetInterface objects.
|
||||
@ -77,24 +76,6 @@ public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterfa
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is an interface for the underlying volume.
|
||||
* @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume
|
||||
*/
|
||||
interface FSVolumeInterface {
|
||||
/** @return a list of block pools. */
|
||||
public String[] getBlockPoolList();
|
||||
|
||||
/** @return the available storage space in bytes. */
|
||||
public long getAvailable() throws IOException;
|
||||
|
||||
/** @return the path to the volume */
|
||||
public String getPath(String bpid) throws IOException;
|
||||
|
||||
/** @return the directory for the finalized blocks in the block pool. */
|
||||
public File getFinalizedDir(String bpid) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create rolling logs.
|
||||
*
|
||||
@ -121,32 +102,15 @@ public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterfa
|
||||
* as corrupted.
|
||||
*/
|
||||
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
File diskMetaFile, FSVolumeInterface vol);
|
||||
File diskMetaFile, FsVolumeSpi vol);
|
||||
|
||||
/**
|
||||
* This class provides the input stream and length of the metadata
|
||||
* of a block
|
||||
*
|
||||
*/
|
||||
static class MetaDataInputStream extends FilterInputStream {
|
||||
MetaDataInputStream(InputStream stream, long len) {
|
||||
super(stream);
|
||||
length = len;
|
||||
}
|
||||
private long length;
|
||||
|
||||
public long getLength() {
|
||||
return length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param b - the block
|
||||
* @return a stream if the meta-data of the block exists;
|
||||
* otherwise, return null.
|
||||
* @throws IOException
|
||||
*/
|
||||
public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b
|
||||
public LengthInputStream getMetaDataInputStream(ExtendedBlock b
|
||||
) throws IOException;
|
||||
|
||||
/**
|
||||
@ -197,58 +161,10 @@ public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterfa
|
||||
* starting at the offset
|
||||
* @throws IOException
|
||||
*/
|
||||
public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
|
||||
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
|
||||
long ckoff) throws IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
* This class contains the output streams for the data and checksum
|
||||
* of a block
|
||||
*
|
||||
*/
|
||||
static class BlockWriteStreams {
|
||||
OutputStream dataOut;
|
||||
OutputStream checksumOut;
|
||||
DataChecksum checksum;
|
||||
|
||||
BlockWriteStreams(OutputStream dOut, OutputStream cOut,
|
||||
DataChecksum checksum) {
|
||||
dataOut = dOut;
|
||||
checksumOut = cOut;
|
||||
this.checksum = checksum;
|
||||
}
|
||||
|
||||
void close() {
|
||||
IOUtils.closeStream(dataOut);
|
||||
IOUtils.closeStream(checksumOut);
|
||||
}
|
||||
|
||||
DataChecksum getChecksum() {
|
||||
return checksum;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class contains the input streams for the data and checksum
|
||||
* of a block
|
||||
*/
|
||||
static class BlockInputStreams implements Closeable {
|
||||
final InputStream dataIn;
|
||||
final InputStream checksumIn;
|
||||
|
||||
BlockInputStreams(InputStream dataIn, InputStream checksumIn) {
|
||||
this.dataIn = dataIn;
|
||||
this.checksumIn = checksumIn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeStream(dataIn);
|
||||
IOUtils.closeStream(checksumIn);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a temporary replica and returns the meta information of the replica
|
||||
*
|
||||
* @param b block
|
||||
@ -395,7 +311,7 @@ public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterfa
|
||||
* @param checksumSize number of bytes each checksum has
|
||||
* @throws IOException
|
||||
*/
|
||||
public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams stream,
|
||||
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams stream,
|
||||
int checksumSize) throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -21,7 +21,7 @@ import java.io.File;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
/**
|
||||
* This class describes a replica that has been finalized.
|
||||
@ -38,7 +38,7 @@ class FinalizedReplica extends ReplicaInfo {
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
FinalizedReplica(long blockId, long len, long genStamp,
|
||||
FSVolumeInterface vol, File dir) {
|
||||
FsVolumeSpi vol, File dir) {
|
||||
super(blockId, len, genStamp, vol, dir);
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ class FinalizedReplica extends ReplicaInfo {
|
||||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
FinalizedReplica(Block block, FSVolumeInterface vol, File dir) {
|
||||
FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
|
||||
super(block, vol, dir);
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,7 @@ import java.io.File;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
/** This class represents replicas being written.
|
||||
* Those are the replicas that
|
||||
@ -36,7 +36,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaBeingWritten(long blockId, long genStamp,
|
||||
FSVolumeInterface vol, File dir) {
|
||||
FsVolumeSpi vol, File dir) {
|
||||
super( blockId, genStamp, vol, dir);
|
||||
}
|
||||
|
||||
@ -48,7 +48,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
|
||||
* @param writer a thread that is writing to this replica
|
||||
*/
|
||||
ReplicaBeingWritten(Block block,
|
||||
FSVolumeInterface vol, File dir, Thread writer) {
|
||||
FsVolumeSpi vol, File dir, Thread writer) {
|
||||
super( block, vol, dir, writer);
|
||||
}
|
||||
|
||||
@ -62,7 +62,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
|
||||
* @param writer a thread that is writing to this replica
|
||||
*/
|
||||
ReplicaBeingWritten(long blockId, long len, long genStamp,
|
||||
FSVolumeInterface vol, File dir, Thread writer ) {
|
||||
FsVolumeSpi vol, File dir, Thread writer ) {
|
||||
super( blockId, len, genStamp, vol, dir, writer);
|
||||
}
|
||||
|
||||
|
@ -24,8 +24,8 @@ import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
@ -53,7 +53,7 @@ class ReplicaInPipeline extends ReplicaInfo
|
||||
* @param state replica state
|
||||
*/
|
||||
ReplicaInPipeline(long blockId, long genStamp,
|
||||
FSVolumeInterface vol, File dir) {
|
||||
FsVolumeSpi vol, File dir) {
|
||||
this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
|
||||
}
|
||||
|
||||
@ -65,7 +65,7 @@ class ReplicaInPipeline extends ReplicaInfo
|
||||
* @param writer a thread that is writing to this replica
|
||||
*/
|
||||
ReplicaInPipeline(Block block,
|
||||
FSVolumeInterface vol, File dir, Thread writer) {
|
||||
FsVolumeSpi vol, File dir, Thread writer) {
|
||||
this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
|
||||
vol, dir, writer);
|
||||
}
|
||||
@ -80,7 +80,7 @@ class ReplicaInPipeline extends ReplicaInfo
|
||||
* @param writer a thread that is writing to this replica
|
||||
*/
|
||||
ReplicaInPipeline(long blockId, long len, long genStamp,
|
||||
FSVolumeInterface vol, File dir, Thread writer ) {
|
||||
FsVolumeSpi vol, File dir, Thread writer ) {
|
||||
super( blockId, len, genStamp, vol, dir);
|
||||
this.bytesAcked = len;
|
||||
this.bytesOnDisk = len;
|
||||
@ -168,7 +168,7 @@ class ReplicaInPipeline extends ReplicaInfo
|
||||
}
|
||||
|
||||
@Override // ReplicaInPipelineInterface
|
||||
public BlockWriteStreams createStreams(boolean isCreate,
|
||||
public ReplicaOutputStreams createStreams(boolean isCreate,
|
||||
DataChecksum requestedChecksum) throws IOException {
|
||||
File blockFile = getBlockFile();
|
||||
File metaFile = getMetaFile();
|
||||
@ -234,7 +234,7 @@ class ReplicaInPipeline extends ReplicaInfo
|
||||
blockOut.getChannel().position(blockDiskSize);
|
||||
crcOut.getChannel().position(crcDiskSize);
|
||||
}
|
||||
return new BlockWriteStreams(blockOut, crcOut, checksum);
|
||||
return new ReplicaOutputStreams(blockOut, crcOut, checksum);
|
||||
} catch (IOException e) {
|
||||
IOUtils.closeStream(blockOut);
|
||||
IOUtils.closeStream(metaRAF);
|
||||
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
/**
|
||||
@ -66,6 +66,6 @@ interface ReplicaInPipelineInterface extends Replica {
|
||||
* @return output streams for writing
|
||||
* @throws IOException if any error occurs
|
||||
*/
|
||||
public BlockWriteStreams createStreams(boolean isCreate,
|
||||
public ReplicaOutputStreams createStreams(boolean isCreate,
|
||||
DataChecksum requestedChecksum) throws IOException;
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.HardLink;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
/**
|
||||
@ -36,7 +36,7 @@ import org.apache.hadoop.io.IOUtils;
|
||||
@InterfaceAudience.Private
|
||||
abstract public class ReplicaInfo extends Block implements Replica {
|
||||
/** volume where the replica belongs */
|
||||
private FSVolumeInterface volume;
|
||||
private FsVolumeSpi volume;
|
||||
/** directory where block & meta files belong */
|
||||
private File dir;
|
||||
|
||||
@ -47,7 +47,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaInfo(long blockId, long genStamp, FSVolumeInterface vol, File dir) {
|
||||
ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) {
|
||||
this( blockId, 0L, genStamp, vol, dir);
|
||||
}
|
||||
|
||||
@ -57,7 +57,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaInfo(Block block, FSVolumeInterface vol, File dir) {
|
||||
ReplicaInfo(Block block, FsVolumeSpi vol, File dir) {
|
||||
this(block.getBlockId(), block.getNumBytes(),
|
||||
block.getGenerationStamp(), vol, dir);
|
||||
}
|
||||
@ -71,7 +71,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaInfo(long blockId, long len, long genStamp,
|
||||
FSVolumeInterface vol, File dir) {
|
||||
FsVolumeSpi vol, File dir) {
|
||||
super(blockId, len, genStamp);
|
||||
this.volume = vol;
|
||||
this.dir = dir;
|
||||
@ -113,14 +113,14 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||
* Get the volume where this replica is located on disk
|
||||
* @return the volume where this replica is located on disk
|
||||
*/
|
||||
FSVolumeInterface getVolume() {
|
||||
FsVolumeSpi getVolume() {
|
||||
return volume;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the volume where this replica is located on disk
|
||||
*/
|
||||
void setVolume(FSVolumeInterface vol) {
|
||||
void setVolume(FsVolumeSpi vol) {
|
||||
this.volume = vol;
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||
import java.io.File;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
|
||||
/**
|
||||
@ -145,7 +145,7 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
||||
}
|
||||
|
||||
@Override //ReplicaInfo
|
||||
void setVolume(FSVolumeInterface vol) {
|
||||
void setVolume(FsVolumeSpi vol) {
|
||||
super.setVolume(vol);
|
||||
original.setVolume(vol);
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ import java.io.File;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
/**
|
||||
* This class represents a replica that is waiting to be recovered.
|
||||
@ -44,7 +44,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp,
|
||||
FSVolumeInterface vol, File dir) {
|
||||
FsVolumeSpi vol, File dir) {
|
||||
super(blockId, len, genStamp, vol, dir);
|
||||
}
|
||||
|
||||
@ -54,7 +54,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
||||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaWaitingToBeRecovered(Block block, FSVolumeInterface vol, File dir) {
|
||||
ReplicaWaitingToBeRecovered(Block block, FsVolumeSpi vol, File dir) {
|
||||
super(block, vol, dir);
|
||||
}
|
||||
|
||||
|
@ -20,10 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
|
||||
public class RoundRobinVolumesPolicy<V extends FSVolumeInterface>
|
||||
public class RoundRobinVolumesPolicy<V extends FsVolumeSpi>
|
||||
implements BlockVolumeChoosingPolicy<V> {
|
||||
|
||||
private int curVolume = 0;
|
||||
|
@ -0,0 +1,38 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This is an interface for the underlying volume.
|
||||
*/
|
||||
public interface FsVolumeSpi {
|
||||
/** @return a list of block pools. */
|
||||
public String[] getBlockPoolList();
|
||||
|
||||
/** @return the available storage space in bytes. */
|
||||
public long getAvailable() throws IOException;
|
||||
|
||||
/** @return the path to the volume */
|
||||
public String getPath(String bpid) throws IOException;
|
||||
|
||||
/** @return the directory for the finalized blocks in the block pool. */
|
||||
public File getFinalizedDir(String bpid) throws IOException;
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* An input stream with length.
|
||||
*/
|
||||
public class LengthInputStream extends FilterInputStream {
|
||||
|
||||
private final long length;
|
||||
|
||||
/**
|
||||
* Create an stream.
|
||||
* @param in the underlying input stream.
|
||||
* @param length the length of the stream.
|
||||
*/
|
||||
public LengthInputStream(InputStream in, long length) {
|
||||
super(in);
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
/** @return the length. */
|
||||
public long getLength() {
|
||||
return length;
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.InputStream;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
/**
|
||||
* Contains the input streams for the data and checksum of a replica.
|
||||
*/
|
||||
public class ReplicaInputStreams implements Closeable {
|
||||
private final InputStream dataIn;
|
||||
private final InputStream checksumIn;
|
||||
|
||||
/** Create an object with a data input stream and a checksum input stream. */
|
||||
public ReplicaInputStreams(InputStream dataIn, InputStream checksumIn) {
|
||||
this.dataIn = dataIn;
|
||||
this.checksumIn = checksumIn;
|
||||
}
|
||||
|
||||
/** @return the data input stream. */
|
||||
public InputStream getDataIn() {
|
||||
return dataIn;
|
||||
}
|
||||
|
||||
/** @return the checksum input stream. */
|
||||
public InputStream getChecksumIn() {
|
||||
return checksumIn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeStream(dataIn);
|
||||
IOUtils.closeStream(checksumIn);
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
/**
|
||||
* Contains the output streams for the data and checksum of a replica.
|
||||
*/
|
||||
public class ReplicaOutputStreams implements Closeable {
|
||||
private final OutputStream dataOut;
|
||||
private final OutputStream checksumOut;
|
||||
private final DataChecksum checksum;
|
||||
|
||||
/**
|
||||
* Create an object with a data output stream, a checksum output stream
|
||||
* and a checksum.
|
||||
*/
|
||||
public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
|
||||
DataChecksum checksum) {
|
||||
this.dataOut = dataOut;
|
||||
this.checksumOut = checksumOut;
|
||||
this.checksum = checksum;
|
||||
}
|
||||
|
||||
/** @return the data output stream. */
|
||||
public OutputStream getDataOut() {
|
||||
return dataOut;
|
||||
}
|
||||
|
||||
/** @return the checksum output stream. */
|
||||
public OutputStream getChecksumOut() {
|
||||
return checksumOut;
|
||||
}
|
||||
|
||||
/** @return the checksum. */
|
||||
public DataChecksum getChecksum() {
|
||||
return checksum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeStream(dataOut);
|
||||
IOUtils.closeStream(checksumOut);
|
||||
}
|
||||
}
|
@ -39,6 +39,10 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
@ -61,8 +65,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
*
|
||||
* Note the synchronization is coarse grained - it is at each method.
|
||||
*/
|
||||
public class SimulatedFSDataset
|
||||
implements FSDatasetInterface<FSDatasetInterface.FSVolumeInterface> {
|
||||
public class SimulatedFSDataset implements FSDatasetInterface<FsVolumeSpi> {
|
||||
static class Factory extends FSDatasetInterface.Factory<SimulatedFSDataset> {
|
||||
@Override
|
||||
public SimulatedFSDataset createFSDatasetInterface(DataNode datanode,
|
||||
@ -215,14 +218,14 @@ public class SimulatedFSDataset
|
||||
}
|
||||
|
||||
@Override
|
||||
synchronized public BlockWriteStreams createStreams(boolean isCreate,
|
||||
synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
|
||||
DataChecksum requestedChecksum) throws IOException {
|
||||
if (finalized) {
|
||||
throw new IOException("Trying to write to a finalized replica "
|
||||
+ theBlock);
|
||||
} else {
|
||||
SimulatedOutputStream crcStream = new SimulatedOutputStream();
|
||||
return new BlockWriteStreams(oStream, crcStream, requestedChecksum);
|
||||
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum);
|
||||
}
|
||||
}
|
||||
|
||||
@ -688,13 +691,13 @@ public class SimulatedFSDataset
|
||||
|
||||
/** Not supported */
|
||||
@Override // FSDatasetInterface
|
||||
public BlockInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
|
||||
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
|
||||
long ckoff) throws IOException {
|
||||
throw new IOException("Not supported");
|
||||
}
|
||||
|
||||
@Override // FSDatasetInterface
|
||||
public synchronized MetaDataInputStream getMetaDataInputStream(ExtendedBlock b
|
||||
public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
|
||||
) throws IOException {
|
||||
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
|
||||
BInfo binfo = map.get(b.getLocalBlock());
|
||||
@ -706,7 +709,7 @@ public class SimulatedFSDataset
|
||||
" is being written, its meta cannot be read");
|
||||
}
|
||||
final SimulatedInputStream sin = binfo.getMetaIStream();
|
||||
return new MetaDataInputStream(sin, sin.getLength());
|
||||
return new LengthInputStream(sin, sin.getLength());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -716,7 +719,7 @@ public class SimulatedFSDataset
|
||||
|
||||
@Override // FSDatasetInterface
|
||||
public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
|
||||
BlockWriteStreams stream,
|
||||
ReplicaOutputStreams stream,
|
||||
int checksumSize)
|
||||
throws IOException {
|
||||
}
|
||||
@ -959,12 +962,12 @@ public class SimulatedFSDataset
|
||||
|
||||
@Override
|
||||
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
File diskMetaFile, FSVolumeInterface vol) {
|
||||
File diskMetaFile, FsVolumeSpi vol) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FSVolumeInterface> getVolumes() {
|
||||
public List<FsVolumeSpi> getVolumes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
@ -535,11 +535,11 @@ public class TestBlockRecovery {
|
||||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
||||
}
|
||||
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(block);
|
||||
BlockWriteStreams streams = null;
|
||||
ReplicaOutputStreams streams = null;
|
||||
try {
|
||||
streams = replicaInfo.createStreams(true,
|
||||
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
|
||||
streams.checksumOut.write('a');
|
||||
streams.getChecksumOut().write('a');
|
||||
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
|
||||
try {
|
||||
dn.syncBlock(rBlock, initBlockRecords(dn));
|
||||
|
@ -37,7 +37,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -98,7 +98,7 @@ public class TestDatanodeRestart {
|
||||
out.write(writeBuf);
|
||||
out.hflush();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
for (FSVolumeInterface v : dn.data.getVolumes()) {
|
||||
for (FsVolumeSpi v : dn.data.getVolumes()) {
|
||||
FSVolume volume = (FSVolume)v;
|
||||
File currentDir = volume.getCurrentDir().getParentFile().getParentFile();
|
||||
File rbwDir = new File(currentDir, "rbw");
|
||||
|
@ -21,7 +21,7 @@ import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.junit.Assert;
|
||||
@ -33,19 +33,19 @@ public class TestRoundRobinVolumesPolicy {
|
||||
// Test the Round-Robin block-volume choosing algorithm.
|
||||
@Test
|
||||
public void testRR() throws Exception {
|
||||
final List<FSVolumeInterface> volumes = new ArrayList<FSVolumeInterface>();
|
||||
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||
|
||||
// First volume, with 100 bytes of space.
|
||||
volumes.add(Mockito.mock(FSVolumeInterface.class));
|
||||
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||
Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
|
||||
|
||||
// Second volume, with 200 bytes of space.
|
||||
volumes.add(Mockito.mock(FSVolumeInterface.class));
|
||||
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final RoundRobinVolumesPolicy<FSVolumeInterface> policy =
|
||||
(RoundRobinVolumesPolicy<FSVolumeInterface>)ReflectionUtils.newInstance(
|
||||
final RoundRobinVolumesPolicy<FsVolumeSpi> policy =
|
||||
(RoundRobinVolumesPolicy<FsVolumeSpi>)ReflectionUtils.newInstance(
|
||||
RoundRobinVolumesPolicy.class, null);
|
||||
|
||||
// Test two rounds of round-robin choosing
|
||||
@ -71,18 +71,18 @@ public class TestRoundRobinVolumesPolicy {
|
||||
@Test
|
||||
public void testRRPolicyExceptionMessage()
|
||||
throws Exception {
|
||||
final List<FSVolumeInterface> volumes = new ArrayList<FSVolumeInterface>();
|
||||
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||
|
||||
// First volume, with 500 bytes of space.
|
||||
volumes.add(Mockito.mock(FSVolumeInterface.class));
|
||||
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||
Mockito.when(volumes.get(0).getAvailable()).thenReturn(500L);
|
||||
|
||||
// Second volume, with 600 bytes of space.
|
||||
volumes.add(Mockito.mock(FSVolumeInterface.class));
|
||||
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
|
||||
|
||||
final RoundRobinVolumesPolicy<FSVolumeInterface> policy
|
||||
= new RoundRobinVolumesPolicy<FSVolumeInterface>();
|
||||
final RoundRobinVolumesPolicy<FsVolumeSpi> policy
|
||||
= new RoundRobinVolumesPolicy<FsVolumeSpi>();
|
||||
int blockSize = 700;
|
||||
try {
|
||||
policy.chooseVolume(volumes, blockSize);
|
||||
|
@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
/**
|
||||
@ -63,10 +63,10 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||
// we pass expected len as zero, - fsdataset should use the sizeof actual
|
||||
// data written
|
||||
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
|
||||
BlockWriteStreams out = bInfo.createStreams(true,
|
||||
ReplicaOutputStreams out = bInfo.createStreams(true,
|
||||
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
|
||||
try {
|
||||
OutputStream dataOut = out.dataOut;
|
||||
OutputStream dataOut = out.getDataOut();
|
||||
assertEquals(0, fsdataset.getLength(b));
|
||||
for (int j=1; j <= blockIdToLen(i); ++j) {
|
||||
dataOut.write(j);
|
||||
|
Loading…
x
Reference in New Issue
Block a user