HDFS-3155. Clean up FSDataset implemenation related code.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1306582 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0475795066
commit
99a68a1423
@ -277,6 +277,8 @@ Release 2.0.0 - UNRELEASED
|
||||
HDFS-3129. NetworkTopology: add test that getLeaf should check for
|
||||
invalid topologies (Colin Patrick McCabe via eli)
|
||||
|
||||
HDFS-3155. Clean up FSDataset implemenation related code. (szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
|
||||
|
@ -358,9 +358,8 @@ class BlockReceiver implements Closeable {
|
||||
* This does not verify the original checksums, under the assumption
|
||||
* that they have already been validated.
|
||||
*/
|
||||
private void translateChunks( byte[] dataBuf, int dataOff, int len,
|
||||
byte[] checksumBuf, int checksumOff )
|
||||
throws IOException {
|
||||
private void translateChunks( byte[] dataBuf, int dataOff, int len,
|
||||
byte[] checksumBuf, int checksumOff ) {
|
||||
if (len == 0) return;
|
||||
|
||||
int numChunks = (len - 1)/bytesPerChecksum + 1;
|
||||
@ -702,7 +701,7 @@ class BlockReceiver implements Closeable {
|
||||
return lastPacketInBlock?-1:len;
|
||||
}
|
||||
|
||||
private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
|
||||
private void dropOsCacheBehindWriter(long offsetInBlock) {
|
||||
try {
|
||||
if (outFd != null &&
|
||||
offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
|
||||
|
@ -111,10 +111,6 @@ class BlockSender implements java.io.Closeable {
|
||||
|
||||
/** the block to read from */
|
||||
private final ExtendedBlock block;
|
||||
/** the replica to read from */
|
||||
private final Replica replica;
|
||||
/** The visible length of a replica. */
|
||||
private final long replicaVisibleLength;
|
||||
/** Stream to read block data from */
|
||||
private InputStream blockIn;
|
||||
/** updated while using transferTo() */
|
||||
@ -189,17 +185,18 @@ class BlockSender implements java.io.Closeable {
|
||||
this.readaheadLength = datanode.getDnConf().readaheadLength;
|
||||
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
|
||||
|
||||
final Replica replica;
|
||||
final long replicaVisibleLength;
|
||||
synchronized(datanode.data) {
|
||||
this.replica = getReplica(block, datanode);
|
||||
this.replicaVisibleLength = replica.getVisibleLength();
|
||||
replica = getReplica(block, datanode);
|
||||
replicaVisibleLength = replica.getVisibleLength();
|
||||
}
|
||||
// if there is a write in progress
|
||||
ChunkChecksum chunkChecksum = null;
|
||||
if (replica instanceof ReplicaBeingWritten) {
|
||||
long minEndOffset = startOffset + length;
|
||||
waitForMinLength((ReplicaBeingWritten)replica, minEndOffset);
|
||||
ReplicaInPipeline rip = (ReplicaInPipeline) replica;
|
||||
chunkChecksum = rip.getLastChecksumAndDataLen();
|
||||
final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
|
||||
waitForMinLength(rbw, startOffset + length);
|
||||
chunkChecksum = rbw.getLastChecksumAndDataLen();
|
||||
}
|
||||
|
||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||
|
@ -760,8 +760,8 @@ public class DataStorage extends Storage {
|
||||
/**
|
||||
* Add bpStorage into bpStorageMap
|
||||
*/
|
||||
private void addBlockPoolStorage(String bpID, BlockPoolSliceStorage bpStorage)
|
||||
throws IOException {
|
||||
private void addBlockPoolStorage(String bpID, BlockPoolSliceStorage bpStorage
|
||||
) {
|
||||
if (!this.bpStorageMap.containsKey(bpID)) {
|
||||
this.bpStorageMap.put(bpID, bpStorage);
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ class DatanodeUtil {
|
||||
|
||||
static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
|
||||
|
||||
private final static String DISK_ERROR = "Possible disk error on file creation: ";
|
||||
private static final String DISK_ERROR = "Possible disk error: ";
|
||||
|
||||
/** Get the cause of an I/O exception if caused by a possible disk error
|
||||
* @param ioe an I/O exception
|
||||
|
@ -1800,7 +1800,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
||||
ReplicaInfo replicaInfo) throws IOException {
|
||||
FinalizedReplica newReplicaInfo = null;
|
||||
if (replicaInfo.getState() == ReplicaState.RUR &&
|
||||
((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() ==
|
||||
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
|
||||
ReplicaState.FINALIZED) {
|
||||
newReplicaInfo = (FinalizedReplica)
|
||||
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
|
||||
@ -2036,7 +2036,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
||||
ReplicaState replicaState = dinfo.getState();
|
||||
if (replicaState == ReplicaState.FINALIZED ||
|
||||
(replicaState == ReplicaState.RUR &&
|
||||
((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() ==
|
||||
((ReplicaUnderRecovery)dinfo).getOriginalReplica().getState() ==
|
||||
ReplicaState.FINALIZED)) {
|
||||
v.clearPath(bpid, parent);
|
||||
}
|
||||
|
@ -86,14 +86,6 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
||||
ReplicaInfo getOriginalReplica() {
|
||||
return original;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the original replica's state
|
||||
* @return the original replica's state
|
||||
*/
|
||||
ReplicaState getOrignalReplicaState() {
|
||||
return original.getState();
|
||||
}
|
||||
|
||||
@Override //ReplicaInfo
|
||||
boolean isUnlinked() {
|
||||
@ -170,6 +162,6 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
||||
ReplicaRecoveryInfo createInfo() {
|
||||
return new ReplicaRecoveryInfo(original.getBlockId(),
|
||||
original.getBytesOnDisk(), original.getGenerationStamp(),
|
||||
getOrignalReplicaState());
|
||||
original.getState());
|
||||
}
|
||||
}
|
||||
|
@ -80,7 +80,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
@ -1664,7 +1663,7 @@ public class MiniDFSCluster {
|
||||
public void triggerBlockReports()
|
||||
throws IOException {
|
||||
for (DataNode dn : getDataNodes()) {
|
||||
DataNodeAdapter.triggerBlockReport(dn);
|
||||
DataNodeTestUtils.triggerBlockReport(dn);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1672,14 +1671,14 @@ public class MiniDFSCluster {
|
||||
public void triggerDeletionReports()
|
||||
throws IOException {
|
||||
for (DataNode dn : getDataNodes()) {
|
||||
DataNodeAdapter.triggerDeletionReport(dn);
|
||||
DataNodeTestUtils.triggerDeletionReport(dn);
|
||||
}
|
||||
}
|
||||
|
||||
public void triggerHeartbeats()
|
||||
throws IOException {
|
||||
for (DataNode dn : getDataNodes()) {
|
||||
DataNodeAdapter.triggerHeartbeat(dn);
|
||||
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,8 +18,8 @@
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
@ -454,7 +454,7 @@ public class TestLeaseRecovery2 {
|
||||
// Make sure the DNs don't send a heartbeat for a while, so the blocks
|
||||
// won't actually get completed during lease recovery.
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
DataNodeAdapter.setHeartbeatsDisabledForTests(dn, true);
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
||||
}
|
||||
|
||||
// set the hard limit to be 1 second
|
||||
@ -474,7 +474,7 @@ public class TestLeaseRecovery2 {
|
||||
|
||||
// Let the DNs send heartbeats again.
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
DataNodeAdapter.setHeartbeatsDisabledForTests(dn, false);
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
||||
}
|
||||
|
||||
cluster.waitActive();
|
||||
|
@ -17,6 +17,13 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
@ -26,22 +33,16 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
public class TestPipelines {
|
||||
public static final Log LOG = LogFactory.getLog(TestPipelines.class);
|
||||
|
||||
@ -105,7 +106,7 @@ public class TestPipelines {
|
||||
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
Replica r = DataNodeAdapter.fetchReplicaInfo(dn, bpid, lb.get(0)
|
||||
Replica r = DataNodeTestUtils.fetchReplicaInfo(dn, bpid, lb.get(0)
|
||||
.getBlock().getBlockId());
|
||||
|
||||
assertTrue("Replica on DN " + dn + " shouldn't be null", r != null);
|
||||
|
@ -1,111 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* WARNING!! This is TEST ONLY class: it never has to be used
|
||||
* for ANY development purposes.
|
||||
*
|
||||
* This is a utility class to expose DataNode functionality for
|
||||
* unit and functional tests.
|
||||
*/
|
||||
public class DataNodeAdapter {
|
||||
/**
|
||||
* Fetch a copy of ReplicaInfo from a datanode by block id
|
||||
* @param dn datanode to retrieve a replicainfo object from
|
||||
* @param bpid Block pool Id
|
||||
* @param blkId id of the replica's block
|
||||
* @return copy of ReplicaInfo object @link{FSDataset#fetchReplicaInfo}
|
||||
*/
|
||||
public static ReplicaInfo fetchReplicaInfo (final DataNode dn,
|
||||
final String bpid,
|
||||
final long blkId) {
|
||||
return ((FSDataset)dn.data).fetchReplicaInfo(bpid, blkId);
|
||||
}
|
||||
|
||||
public static void setHeartbeatsDisabledForTests(DataNode dn,
|
||||
boolean heartbeatsDisabledForTests) {
|
||||
dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
|
||||
}
|
||||
|
||||
public static void triggerDeletionReport(DataNode dn) throws IOException {
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
bpos.triggerDeletionReportForTests();
|
||||
}
|
||||
}
|
||||
|
||||
public static void triggerHeartbeat(DataNode dn) throws IOException {
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
bpos.triggerHeartbeatForTests();
|
||||
}
|
||||
}
|
||||
|
||||
public static void triggerBlockReport(DataNode dn) throws IOException {
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
bpos.triggerBlockReportForTests();
|
||||
}
|
||||
}
|
||||
|
||||
public static long getPendingAsyncDeletions(DataNode dn) {
|
||||
FSDataset fsd = (FSDataset)dn.getFSDataset();
|
||||
return fsd.asyncDiskService.countPendingDeletions();
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert a Mockito spy object between the given DataNode and
|
||||
* the given NameNode. This can be used to delay or wait for
|
||||
* RPC calls on the datanode->NN path.
|
||||
*/
|
||||
public static DatanodeProtocolClientSideTranslatorPB spyOnBposToNN(
|
||||
DataNode dn, NameNode nn) {
|
||||
String bpid = nn.getNamesystem().getBlockPoolId();
|
||||
|
||||
BPOfferService bpos = null;
|
||||
for (BPOfferService thisBpos : dn.getAllBpOs()) {
|
||||
if (thisBpos.getBlockPoolId().equals(bpid)) {
|
||||
bpos = thisBpos;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Preconditions.checkArgument(bpos != null,
|
||||
"No such bpid: %s", bpid);
|
||||
|
||||
BPServiceActor bpsa = null;
|
||||
for (BPServiceActor thisBpsa : bpos.getBPServiceActors()) {
|
||||
if (thisBpsa.getNNSocketAddress().equals(nn.getServiceRpcAddress())) {
|
||||
bpsa = thisBpsa;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Preconditions.checkArgument(bpsa != null,
|
||||
"No service actor to NN at %s", nn.getServiceRpcAddress());
|
||||
|
||||
DatanodeProtocolClientSideTranslatorPB origNN = bpsa.getNameNodeProxy();
|
||||
DatanodeProtocolClientSideTranslatorPB spy = Mockito.spy(origNN);
|
||||
bpsa.setNameNode(spy);
|
||||
return spy;
|
||||
}
|
||||
}
|
@ -24,8 +24,13 @@ import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Utility class for accessing package-private DataNode information during tests.
|
||||
@ -42,6 +47,64 @@ public class DataNodeTestUtils {
|
||||
return dn.getDNRegistrationForBP(bpid);
|
||||
}
|
||||
|
||||
public static void setHeartbeatsDisabledForTests(DataNode dn,
|
||||
boolean heartbeatsDisabledForTests) {
|
||||
dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
|
||||
}
|
||||
|
||||
public static void triggerDeletionReport(DataNode dn) throws IOException {
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
bpos.triggerDeletionReportForTests();
|
||||
}
|
||||
}
|
||||
|
||||
public static void triggerHeartbeat(DataNode dn) throws IOException {
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
bpos.triggerHeartbeatForTests();
|
||||
}
|
||||
}
|
||||
|
||||
public static void triggerBlockReport(DataNode dn) throws IOException {
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
bpos.triggerBlockReportForTests();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert a Mockito spy object between the given DataNode and
|
||||
* the given NameNode. This can be used to delay or wait for
|
||||
* RPC calls on the datanode->NN path.
|
||||
*/
|
||||
public static DatanodeProtocolClientSideTranslatorPB spyOnBposToNN(
|
||||
DataNode dn, NameNode nn) {
|
||||
String bpid = nn.getNamesystem().getBlockPoolId();
|
||||
|
||||
BPOfferService bpos = null;
|
||||
for (BPOfferService thisBpos : dn.getAllBpOs()) {
|
||||
if (thisBpos.getBlockPoolId().equals(bpid)) {
|
||||
bpos = thisBpos;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Preconditions.checkArgument(bpos != null,
|
||||
"No such bpid: %s", bpid);
|
||||
|
||||
BPServiceActor bpsa = null;
|
||||
for (BPServiceActor thisBpsa : bpos.getBPServiceActors()) {
|
||||
if (thisBpsa.getNNSocketAddress().equals(nn.getServiceRpcAddress())) {
|
||||
bpsa = thisBpsa;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Preconditions.checkArgument(bpsa != null,
|
||||
"No service actor to NN at %s", nn.getServiceRpcAddress());
|
||||
|
||||
DatanodeProtocolClientSideTranslatorPB origNN = bpsa.getNameNodeProxy();
|
||||
DatanodeProtocolClientSideTranslatorPB spy = Mockito.spy(origNN);
|
||||
bpsa.setNameNode(spy);
|
||||
return spy;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used for testing.
|
||||
* Examples are adding and deleting blocks directly.
|
||||
@ -53,18 +116,37 @@ public class DataNodeTestUtils {
|
||||
return dn.getFSDataset();
|
||||
}
|
||||
|
||||
public static FSDataset getFsDatasetImpl(DataNode dn) {
|
||||
return (FSDataset)dn.getFSDataset();
|
||||
}
|
||||
|
||||
public static File getFile(DataNode dn, String bpid, long bid) {
|
||||
return ((FSDataset)dn.getFSDataset()).getFile(bpid, bid);
|
||||
return getFsDatasetImpl(dn).getFile(bpid, bid);
|
||||
}
|
||||
|
||||
public static File getBlockFile(DataNode dn, String bpid, Block b
|
||||
) throws IOException {
|
||||
return ((FSDataset)dn.getFSDataset()).getBlockFile(bpid, b);
|
||||
return getFsDatasetImpl(dn).getBlockFile(bpid, b);
|
||||
}
|
||||
|
||||
public static boolean unlinkBlock(DataNode dn, ExtendedBlock block, int numLinks
|
||||
) throws IOException {
|
||||
ReplicaInfo info = ((FSDataset)dn.getFSDataset()).getReplicaInfo(block);
|
||||
return info.unlinkBlock(numLinks);
|
||||
return getFsDatasetImpl(dn).getReplicaInfo(block).unlinkBlock(numLinks);
|
||||
}
|
||||
|
||||
public static long getPendingAsyncDeletions(DataNode dn) {
|
||||
return getFsDatasetImpl(dn).asyncDiskService.countPendingDeletions();
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch a copy of ReplicaInfo from a datanode by block id
|
||||
* @param dn datanode to retrieve a replicainfo object from
|
||||
* @param bpid Block pool Id
|
||||
* @param blkId id of the replica's block
|
||||
* @return copy of ReplicaInfo object @link{FSDataset#fetchReplicaInfo}
|
||||
*/
|
||||
public static ReplicaInfo fetchReplicaInfo(final DataNode dn,
|
||||
final String bpid, final long blkId) {
|
||||
return getFsDatasetImpl(dn).fetchReplicaInfo(bpid, blkId);
|
||||
}
|
||||
}
|
||||
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolumeSet;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
|
@ -17,6 +17,17 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
@ -40,27 +51,17 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* This test simulates a variety of situations when blocks are being
|
||||
* intentionally orrupted, unexpectedly modified, and so on before a block
|
||||
@ -561,7 +562,7 @@ public class TestBlockReport {
|
||||
// from this node.
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
DatanodeProtocolClientSideTranslatorPB spy =
|
||||
DataNodeAdapter.spyOnBposToNN(dn, nn);
|
||||
DataNodeTestUtils.spyOnBposToNN(dn, nn);
|
||||
|
||||
Mockito.doAnswer(delayer)
|
||||
.when(spy).blockReport(
|
||||
|
@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||
@ -83,8 +83,8 @@ public class HAStressTestHarness {
|
||||
@Override
|
||||
public void doAnAction() throws Exception {
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
DataNodeAdapter.triggerDeletionReport(dn);
|
||||
DataNodeAdapter.triggerHeartbeat(dn);
|
||||
DataNodeTestUtils.triggerDeletionReport(dn);
|
||||
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||
}
|
||||
for (int i = 0; i < 2; i++) {
|
||||
NameNode nn = cluster.getNameNode(i);
|
||||
|
@ -37,7 +37,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
@ -96,7 +96,7 @@ public abstract class HATestUtil {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
if (DataNodeAdapter.getPendingAsyncDeletions(dn) > 0) {
|
||||
if (DataNodeTestUtils.getPendingAsyncDeletions(dn) > 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
@ -47,7 +47,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
@ -61,7 +60,6 @@ import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
@ -72,9 +70,7 @@ import com.google.common.collect.Lists;
|
||||
|
||||
public class TestDNFencing {
|
||||
|
||||
protected static final Log LOG = LogFactory.getLog(
|
||||
TestDNFencing.class);
|
||||
private static final String TEST_FILE_DATA = "hello highly available world";
|
||||
protected static final Log LOG = LogFactory.getLog(TestDNFencing.class);
|
||||
private static final String TEST_FILE = "/testStandbyIsHot";
|
||||
private static final Path TEST_FILE_PATH = new Path(TEST_FILE);
|
||||
private static final int SMALL_BLOCK = 1024;
|
||||
@ -497,7 +493,7 @@ public class TestDNFencing {
|
||||
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
DatanodeProtocolClientSideTranslatorPB spy =
|
||||
DataNodeAdapter.spyOnBposToNN(dn, nn2);
|
||||
DataNodeTestUtils.spyOnBposToNN(dn, nn2);
|
||||
|
||||
Mockito.doAnswer(delayer)
|
||||
.when(spy).blockReport(
|
||||
|
@ -17,7 +17,10 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
@ -45,7 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
@ -54,9 +57,7 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||
|
||||
import org.apache.log4j.Level;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@ -297,7 +298,7 @@ public class TestPipelinesFailover {
|
||||
// active.
|
||||
DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort());
|
||||
DatanodeProtocolClientSideTranslatorPB nnSpy =
|
||||
DataNodeAdapter.spyOnBposToNN(primaryDN, nn0);
|
||||
DataNodeTestUtils.spyOnBposToNN(primaryDN, nn0);
|
||||
|
||||
// Delay the commitBlockSynchronization call
|
||||
DelayAnswer delayer = new DelayAnswer(LOG);
|
||||
|
@ -35,14 +35,14 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
@ -225,7 +225,7 @@ public class TestStandbyIsHot {
|
||||
LOG.info("Got " + numReplicas + " locs: " + locs);
|
||||
if (numReplicas > expectedReplicas) {
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
DataNodeAdapter.triggerDeletionReport(dn);
|
||||
DataNodeTestUtils.triggerDeletionReport(dn);
|
||||
}
|
||||
}
|
||||
return numReplicas == expectedReplicas;
|
||||
|
Loading…
x
Reference in New Issue
Block a user