HADOOP-19164. Hadoop CLI MiniCluster is broken (#7050). Contributed by Ayush Saxena.
Reviewed-by: Vinayakumar B <vinayakumarb@apache.org>
This commit is contained in:
parent
6bcc254123
commit
28538d628e
@ -32,8 +32,6 @@ You should be able to obtain the Hadoop tarball from the release. Also, you can
|
|||||||
$ mvn clean install -DskipTests
|
$ mvn clean install -DskipTests
|
||||||
$ mvn package -Pdist -Dtar -DskipTests -Dmaven.javadoc.skip
|
$ mvn package -Pdist -Dtar -DskipTests -Dmaven.javadoc.skip
|
||||||
|
|
||||||
**NOTE:** You will need [protoc 2.5.0](http://code.google.com/p/protobuf/) installed.
|
|
||||||
|
|
||||||
The tarball should be available in `hadoop-dist/target/` directory.
|
The tarball should be available in `hadoop-dist/target/` directory.
|
||||||
|
|
||||||
Running the MiniCluster
|
Running the MiniCluster
|
||||||
@ -41,9 +39,9 @@ Running the MiniCluster
|
|||||||
|
|
||||||
From inside the root directory of the extracted tarball, you can start the CLI MiniCluster using the following command:
|
From inside the root directory of the extracted tarball, you can start the CLI MiniCluster using the following command:
|
||||||
|
|
||||||
$ bin/mapred minicluster -rmport RM_PORT -jhsport JHS_PORT
|
$ bin/mapred minicluster -format
|
||||||
|
|
||||||
In the example command above, `RM_PORT` and `JHS_PORT` should be replaced by the user's choice of these port numbers. If not specified, random free ports will be used.
|
The format option is required when running the minicluster for the first time, from next time -format option isn't required.
|
||||||
|
|
||||||
There are a number of command line arguments that the users can use to control which services to start, and to pass other configuration properties. The available command line arguments:
|
There are a number of command line arguments that the users can use to control which services to start, and to pass other configuration properties. The available command line arguments:
|
||||||
|
|
||||||
|
@ -84,6 +84,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
@ -201,7 +202,7 @@ public void testServerDefaultsWithCaching()
|
|||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
// Set a spy namesystem inside the namenode and return it
|
// Set a spy namesystem inside the namenode and return it
|
||||||
FSNamesystem spyNamesystem =
|
FSNamesystem spyNamesystem =
|
||||||
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
|
NameNodeAdapterMockitoUtil.spyOnNamesystem(cluster.getNameNode());
|
||||||
InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
|
InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
|
||||||
try {
|
try {
|
||||||
// Create a dfs client and set a long enough validity interval
|
// Create a dfs client and set a long enough validity interval
|
||||||
@ -252,7 +253,7 @@ public void testServerDefaultsWithMinimalCaching() throws Exception {
|
|||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
// Set a spy namesystem inside the namenode and return it
|
// Set a spy namesystem inside the namenode and return it
|
||||||
FSNamesystem spyNamesystem =
|
FSNamesystem spyNamesystem =
|
||||||
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
|
NameNodeAdapterMockitoUtil.spyOnNamesystem(cluster.getNameNode());
|
||||||
InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
|
InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
|
||||||
try {
|
try {
|
||||||
// Create a dfs client and set a minimal validity interval
|
// Create a dfs client and set a minimal validity interval
|
||||||
|
@ -37,7 +37,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
|
||||||
import org.apache.hadoop.test.MockitoUtil;
|
import org.apache.hadoop.test.MockitoUtil;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -297,7 +297,8 @@ public void testGetBlockLocationsOnlyUsesReadLock() throws IOException {
|
|||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
.numDataNodes(0)
|
.numDataNodes(0)
|
||||||
.build();
|
.build();
|
||||||
ReentrantReadWriteLock spyLock = NameNodeAdapter.spyOnFsLock(cluster.getNamesystem());
|
ReentrantReadWriteLock spyLock =
|
||||||
|
NameNodeAdapterMockitoUtil.spyOnFsLock(cluster.getNamesystem());
|
||||||
try {
|
try {
|
||||||
// Create empty file in the FSN.
|
// Create empty file in the FSN.
|
||||||
Path p = new Path("/empty-file");
|
Path p = new Path("/empty-file");
|
||||||
|
@ -111,7 +111,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
@ -1877,7 +1877,7 @@ public Void run() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void spyFSNamesystem(NameNode nn) throws IOException {
|
private void spyFSNamesystem(NameNode nn) throws IOException {
|
||||||
FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
|
FSNamesystem fsnSpy = NameNodeAdapterMockitoUtil.spyOnNamesystem(nn);
|
||||||
doAnswer(new Answer<BlocksWithLocations>() {
|
doAnswer(new Answer<BlocksWithLocations>() {
|
||||||
@Override
|
@Override
|
||||||
public BlocksWithLocations answer(InvocationOnMock invocation)
|
public BlocksWithLocations answer(InvocationOnMock invocation)
|
||||||
|
@ -51,7 +51,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
@ -259,7 +259,7 @@ private void testBalancerWithObserver(boolean withObserverFailure)
|
|||||||
List<FSNamesystem> namesystemSpies = new ArrayList<>();
|
List<FSNamesystem> namesystemSpies = new ArrayList<>();
|
||||||
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
|
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
|
||||||
namesystemSpies.add(
|
namesystemSpies.add(
|
||||||
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode(i)));
|
NameNodeAdapterMockitoUtil.spyOnNamesystem(cluster.getNameNode(i)));
|
||||||
}
|
}
|
||||||
if (withObserverFailure) {
|
if (withObserverFailure) {
|
||||||
// First observer NN is at index 2
|
// First observer NN is at index 2
|
||||||
|
@ -19,21 +19,15 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
|
||||||
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.reflect.FieldUtils;
|
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
@ -47,7 +41,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
@ -57,11 +50,6 @@
|
|||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.test.Whitebox;
|
import org.apache.hadoop.test.Whitebox;
|
||||||
import org.mockito.ArgumentMatcher;
|
|
||||||
import org.mockito.ArgumentMatchers;
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY;
|
import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY;
|
||||||
|
|
||||||
@ -269,97 +257,6 @@ public static BlockInfo getStoredBlock(final FSNamesystem fsn,
|
|||||||
return fsn.getStoredBlock(b);
|
return fsn.getStoredBlock(b);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FSNamesystem spyOnNamesystem(NameNode nn) {
|
|
||||||
FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem());
|
|
||||||
FSNamesystem fsnOld = nn.namesystem;
|
|
||||||
fsnOld.writeLock();
|
|
||||||
fsnSpy.writeLock();
|
|
||||||
nn.namesystem = fsnSpy;
|
|
||||||
try {
|
|
||||||
FieldUtils.writeDeclaredField(
|
|
||||||
(NameNodeRpcServer)nn.getRpcServer(), "namesystem", fsnSpy, true);
|
|
||||||
FieldUtils.writeDeclaredField(
|
|
||||||
fsnSpy.getBlockManager(), "namesystem", fsnSpy, true);
|
|
||||||
FieldUtils.writeDeclaredField(
|
|
||||||
fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true);
|
|
||||||
FieldUtils.writeDeclaredField(
|
|
||||||
fsnSpy.getBlockManager().getDatanodeManager(),
|
|
||||||
"namesystem", fsnSpy, true);
|
|
||||||
FieldUtils.writeDeclaredField(
|
|
||||||
BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()),
|
|
||||||
"namesystem", fsnSpy, true);
|
|
||||||
} catch (IllegalAccessException e) {
|
|
||||||
throw new RuntimeException("Cannot set spy FSNamesystem", e);
|
|
||||||
} finally {
|
|
||||||
fsnSpy.writeUnlock();
|
|
||||||
fsnOld.writeUnlock();
|
|
||||||
}
|
|
||||||
return fsnSpy;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static BlockManager spyOnBlockManager(NameNode nn) {
|
|
||||||
BlockManager bmSpy = Mockito.spy(nn.getNamesystem().getBlockManager());
|
|
||||||
nn.getNamesystem().setBlockManagerForTesting(bmSpy);
|
|
||||||
return bmSpy;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
|
|
||||||
ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests());
|
|
||||||
fsn.setFsLockForTests(spy);
|
|
||||||
return spy;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static FSImage spyOnFsImage(NameNode nn1) {
|
|
||||||
FSNamesystem fsn = nn1.getNamesystem();
|
|
||||||
FSImage spy = Mockito.spy(fsn.getFSImage());
|
|
||||||
Whitebox.setInternalState(fsn, "fsImage", spy);
|
|
||||||
return spy;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static FSEditLog spyOnEditLog(NameNode nn) {
|
|
||||||
FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
|
|
||||||
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
|
|
||||||
EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
|
|
||||||
if (tailer != null) {
|
|
||||||
tailer.setEditLog(spyEditLog);
|
|
||||||
}
|
|
||||||
return spyEditLog;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Spy on EditLog to delay execution of doEditTransaction() for MkdirOp.
|
|
||||||
*/
|
|
||||||
public static FSEditLog spyDelayMkDirTransaction(
|
|
||||||
final NameNode nn, final long delay) {
|
|
||||||
FSEditLog realEditLog = nn.getFSImage().getEditLog();
|
|
||||||
FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog);
|
|
||||||
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
|
|
||||||
Answer<Boolean> ans = new Answer<Boolean>() {
|
|
||||||
@Override
|
|
||||||
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
Thread.sleep(delay);
|
|
||||||
return (Boolean) invocation.callRealMethod();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
ArgumentMatcher<FSEditLogOp> am = new ArgumentMatcher<FSEditLogOp>() {
|
|
||||||
@Override
|
|
||||||
public boolean matches(FSEditLogOp argument) {
|
|
||||||
FSEditLogOp op = (FSEditLogOp) argument;
|
|
||||||
return op.opCode == FSEditLogOpCodes.OP_MKDIR;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
doAnswer(ans).when(spyEditLog).doEditTransaction(
|
|
||||||
ArgumentMatchers.argThat(am));
|
|
||||||
return spyEditLog;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static JournalSet spyOnJournalSet(NameNode nn) {
|
|
||||||
FSEditLog editLog = nn.getFSImage().getEditLog();
|
|
||||||
JournalSet js = Mockito.spy(editLog.getJournalSet());
|
|
||||||
editLog.setJournalSetForTesting(js);
|
|
||||||
return js;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getMkdirOpPath(FSEditLogOp op) {
|
public static String getMkdirOpPath(FSEditLogOp op) {
|
||||||
if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
|
if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
|
||||||
return ((MkdirOp) op).path;
|
return ((MkdirOp) op).path;
|
||||||
|
@ -0,0 +1,124 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.mockito.ArgumentMatcher;
|
||||||
|
import org.mockito.ArgumentMatchers;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.reflect.FieldUtils;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
|
||||||
|
import org.apache.hadoop.test.Whitebox;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a Mockito based utility class to expose NameNode functionality for unit tests.
|
||||||
|
*/
|
||||||
|
public final class NameNodeAdapterMockitoUtil {
|
||||||
|
|
||||||
|
private NameNodeAdapterMockitoUtil() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BlockManager spyOnBlockManager(NameNode nn) {
|
||||||
|
BlockManager bmSpy = spy(nn.getNamesystem().getBlockManager());
|
||||||
|
nn.getNamesystem().setBlockManagerForTesting(bmSpy);
|
||||||
|
return bmSpy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
|
||||||
|
ReentrantReadWriteLock spy = spy(fsn.getFsLockForTests());
|
||||||
|
fsn.setFsLockForTests(spy);
|
||||||
|
return spy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FSImage spyOnFsImage(NameNode nn1) {
|
||||||
|
FSNamesystem fsn = nn1.getNamesystem();
|
||||||
|
FSImage spy = spy(fsn.getFSImage());
|
||||||
|
Whitebox.setInternalState(fsn, "fsImage", spy);
|
||||||
|
return spy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JournalSet spyOnJournalSet(NameNode nn) {
|
||||||
|
FSEditLog editLog = nn.getFSImage().getEditLog();
|
||||||
|
JournalSet js = spy(editLog.getJournalSet());
|
||||||
|
editLog.setJournalSetForTesting(js);
|
||||||
|
return js;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FSNamesystem spyOnNamesystem(NameNode nn) {
|
||||||
|
FSNamesystem fsnSpy = spy(nn.getNamesystem());
|
||||||
|
FSNamesystem fsnOld = nn.namesystem;
|
||||||
|
fsnOld.writeLock();
|
||||||
|
fsnSpy.writeLock();
|
||||||
|
nn.namesystem = fsnSpy;
|
||||||
|
try {
|
||||||
|
FieldUtils.writeDeclaredField(nn.getRpcServer(), "namesystem", fsnSpy, true);
|
||||||
|
FieldUtils.writeDeclaredField(
|
||||||
|
fsnSpy.getBlockManager(), "namesystem", fsnSpy, true);
|
||||||
|
FieldUtils.writeDeclaredField(
|
||||||
|
fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true);
|
||||||
|
FieldUtils.writeDeclaredField(
|
||||||
|
fsnSpy.getBlockManager().getDatanodeManager(),
|
||||||
|
"namesystem", fsnSpy, true);
|
||||||
|
FieldUtils.writeDeclaredField(
|
||||||
|
BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()),
|
||||||
|
"namesystem", fsnSpy, true);
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
throw new RuntimeException("Cannot set spy FSNamesystem", e);
|
||||||
|
} finally {
|
||||||
|
fsnSpy.writeUnlock();
|
||||||
|
fsnOld.writeUnlock();
|
||||||
|
}
|
||||||
|
return fsnSpy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FSEditLog spyOnEditLog(NameNode nn) {
|
||||||
|
FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
|
||||||
|
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
|
||||||
|
EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
|
||||||
|
if (tailer != null) {
|
||||||
|
tailer.setEditLog(spyEditLog);
|
||||||
|
}
|
||||||
|
return spyEditLog;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Spy on EditLog to delay execution of doEditTransaction() for MkdirOp.
|
||||||
|
*/
|
||||||
|
public static FSEditLog spyDelayMkDirTransaction(
|
||||||
|
final NameNode nn, final long delay) {
|
||||||
|
FSEditLog realEditLog = nn.getFSImage().getEditLog();
|
||||||
|
FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog);
|
||||||
|
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
|
||||||
|
Answer<Boolean> ans = invocation -> {
|
||||||
|
Thread.sleep(delay);
|
||||||
|
return (Boolean) invocation.callRealMethod();
|
||||||
|
};
|
||||||
|
ArgumentMatcher<FSEditLogOp> am = argument -> argument.opCode == FSEditLogOpCodes.OP_MKDIR;
|
||||||
|
doAnswer(ans).when(spyEditLog).doEditTransaction(ArgumentMatchers.argThat(am));
|
||||||
|
return spyEditLog;
|
||||||
|
}
|
||||||
|
}
|
@ -50,6 +50,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.ExitUtil.ExitException;
|
import org.apache.hadoop.util.ExitUtil.ExitException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -336,7 +337,7 @@ public void testFailureToReadEditsOnTransitionToActive() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
|
private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
|
||||||
FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1);
|
FSEditLog spyEditLog = NameNodeAdapterMockitoUtil.spyOnEditLog(nn1);
|
||||||
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
||||||
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
||||||
anyLong(), anyLong(), any(), anyBoolean(), anyBoolean());
|
anyLong(), anyLong(), any(), anyBoolean(), anyBoolean());
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
@ -241,7 +242,7 @@ public void testTransitionSynchronization() throws Exception {
|
|||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
ReentrantReadWriteLock spyLock = NameNodeAdapter.spyOnFsLock(
|
ReentrantReadWriteLock spyLock = NameNodeAdapterMockitoUtil.spyOnFsLock(
|
||||||
cluster.getNameNode(0).getNamesystem());
|
cluster.getNameNode(0).getNamesystem());
|
||||||
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(50))
|
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(50))
|
||||||
.when(spyLock).writeLock();
|
.when(spyLock).writeLock();
|
||||||
|
@ -65,7 +65,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
|
||||||
import org.apache.hadoop.hdfs.tools.GetGroups;
|
import org.apache.hadoop.hdfs.tools.GetGroups;
|
||||||
@ -422,7 +422,7 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
|
|||||||
// Mock block manager for observer to generate some fake blocks which
|
// Mock block manager for observer to generate some fake blocks which
|
||||||
// will trigger the (retriable) safe mode exception.
|
// will trigger the (retriable) safe mode exception.
|
||||||
BlockManager bmSpy =
|
BlockManager bmSpy =
|
||||||
NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2));
|
NameNodeAdapterMockitoUtil.spyOnBlockManager(dfsCluster.getNameNode(2));
|
||||||
doAnswer((invocation) -> {
|
doAnswer((invocation) -> {
|
||||||
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
|
||||||
LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
|
LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
|
||||||
@ -457,7 +457,7 @@ public void testObserverNodeBlockMissingRetry() throws Exception {
|
|||||||
// Mock block manager for observer to generate some fake blocks which
|
// Mock block manager for observer to generate some fake blocks which
|
||||||
// will trigger the block missing exception.
|
// will trigger the block missing exception.
|
||||||
|
|
||||||
BlockManager bmSpy = NameNodeAdapter
|
BlockManager bmSpy = NameNodeAdapterMockitoUtil
|
||||||
.spyOnBlockManager(dfsCluster.getNameNode(2));
|
.spyOnBlockManager(dfsCluster.getNameNode(2));
|
||||||
doAnswer((invocation) -> {
|
doAnswer((invocation) -> {
|
||||||
List<LocatedBlock> fakeBlocks = new ArrayList<>();
|
List<LocatedBlock> fakeBlocks = new ArrayList<>();
|
||||||
@ -626,7 +626,7 @@ public void testMkdirsRaceWithObserverRead() throws Exception {
|
|||||||
assertSentTo(2);
|
assertSentTo(2);
|
||||||
|
|
||||||
// Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec
|
// Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec
|
||||||
FSEditLog spyEditLog = NameNodeAdapter.spyDelayMkDirTransaction(
|
FSEditLog spyEditLog = NameNodeAdapterMockitoUtil.spyDelayMkDirTransaction(
|
||||||
dfsCluster.getNameNode(0), 100);
|
dfsCluster.getNameNode(0), 100);
|
||||||
|
|
||||||
final int numThreads = 4;
|
final int numThreads = 4;
|
||||||
|
@ -157,7 +157,7 @@ public void shutdownCluster() throws IOException {
|
|||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testSBNCheckpoints() throws Exception {
|
public void testSBNCheckpoints() throws Exception {
|
||||||
JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(nns[1]);
|
JournalSet standbyJournalSet = NameNodeAdapterMockitoUtil.spyOnJournalSet(nns[1]);
|
||||||
|
|
||||||
doEdits(0, 10);
|
doEdits(0, 10);
|
||||||
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
|
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
|
||||||
@ -350,7 +350,7 @@ public void testCheckpointWhenNoNewTransactionsHappened()
|
|||||||
cluster.restartNameNode(1);
|
cluster.restartNameNode(1);
|
||||||
nns[1] = cluster.getNameNode(1);
|
nns[1] = cluster.getNameNode(1);
|
||||||
|
|
||||||
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
|
FSImage spyImage1 = NameNodeAdapterMockitoUtil.spyOnFsImage(nns[1]);
|
||||||
|
|
||||||
// We shouldn't save any checkpoints at txid=0
|
// We shouldn't save any checkpoints at txid=0
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
@ -486,7 +486,7 @@ public Boolean get() {
|
|||||||
public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
|
public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
|
||||||
|
|
||||||
// Set it up so that we know when the SBN checkpoint starts and ends.
|
// Set it up so that we know when the SBN checkpoint starts and ends.
|
||||||
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
|
FSImage spyImage1 = NameNodeAdapterMockitoUtil.spyOnFsImage(nns[1]);
|
||||||
DelayAnswer answerer = new DelayAnswer(LOG);
|
DelayAnswer answerer = new DelayAnswer(LOG);
|
||||||
Mockito.doAnswer(answerer).when(spyImage1)
|
Mockito.doAnswer(answerer).when(spyImage1)
|
||||||
.saveNamespace(any(FSNamesystem.class),
|
.saveNamespace(any(FSNamesystem.class),
|
||||||
@ -531,7 +531,7 @@ public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
|
|||||||
public void testReadsAllowedDuringCheckpoint() throws Exception {
|
public void testReadsAllowedDuringCheckpoint() throws Exception {
|
||||||
|
|
||||||
// Set it up so that we know when the SBN checkpoint starts and ends.
|
// Set it up so that we know when the SBN checkpoint starts and ends.
|
||||||
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
|
FSImage spyImage1 = NameNodeAdapterMockitoUtil.spyOnFsImage(nns[1]);
|
||||||
DelayAnswer answerer = new DelayAnswer(LOG);
|
DelayAnswer answerer = new DelayAnswer(LOG);
|
||||||
Mockito.doAnswer(answerer).when(spyImage1)
|
Mockito.doAnswer(answerer).when(spyImage1)
|
||||||
.saveNamespace(any(FSNamesystem.class),
|
.saveNamespace(any(FSNamesystem.class),
|
||||||
|
@ -65,6 +65,7 @@
|
|||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.fs.QuotaUsage;
|
import org.apache.hadoop.fs.QuotaUsage;
|
||||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
|
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -115,7 +116,6 @@
|
|||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||||
@ -2007,7 +2007,7 @@ public void testFsserverDefaultsBackwardsCompatible() throws Exception {
|
|||||||
final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
final WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||||
FSNamesystem fsnSpy =
|
FSNamesystem fsnSpy =
|
||||||
NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
|
NameNodeAdapterMockitoUtil.spyOnNamesystem(cluster.getNameNode());
|
||||||
Mockito.when(fsnSpy.getServerDefaults())
|
Mockito.when(fsnSpy.getServerDefaults())
|
||||||
.thenThrow(new UnsupportedOperationException());
|
.thenThrow(new UnsupportedOperationException());
|
||||||
try {
|
try {
|
||||||
|
@ -112,7 +112,7 @@ private Options makeOptions() {
|
|||||||
Option.builder("writeConfig").hasArg().argName("path").desc(
|
Option.builder("writeConfig").hasArg().argName("path").desc(
|
||||||
"Save configuration to this XML file.").build())
|
"Save configuration to this XML file.").build())
|
||||||
.addOption(
|
.addOption(
|
||||||
Option.builder("writeDetails").argName("path").desc(
|
Option.builder("writeDetails").hasArg().argName("path").desc(
|
||||||
"Write basic information to this JSON file.").build())
|
"Write basic information to this JSON file.").build())
|
||||||
.addOption(
|
.addOption(
|
||||||
Option.builder("help").desc("Prints option help.").build());
|
Option.builder("help").desc("Prints option help.").build());
|
||||||
|
Loading…
Reference in New Issue
Block a user