Merge trunk into HA branch.
- Some conflicts on BlockManager due to the recent commit of HDFS-2822 - moved logic to processMisReplicatedBlock function - testNoExtensionIfNoBlocks(org.apache.hadoop.hdfs.TestSafeMode) is now failing due to an existing bug in the HA branch, to be fixed soon git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1235078 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
fa4a33a566
@ -285,6 +285,9 @@ Release 0.23.1 - Unreleased
|
||||
HADOOP-7986. Adding config for MapReduce History Server protocol in
|
||||
hadoop-policy.xml for service level authorization. (Mahadev Konar via vinodkv)
|
||||
|
||||
HADOOP-7981. Improve documentation for org.apache.hadoop.io.compress.
|
||||
Decompressor.getRemaining (Jonathan Eagles via mahadev)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -49,7 +49,7 @@ public interface Decompressor {
|
||||
public void setInput(byte[] b, int off, int len);
|
||||
|
||||
/**
|
||||
* Returns true if the input data buffer is empty and
|
||||
* Returns <code>true</code> if the input data buffer is empty and
|
||||
* {@link #setInput(byte[], int, int)} should be called to
|
||||
* provide more input.
|
||||
*
|
||||
@ -76,8 +76,11 @@ public interface Decompressor {
|
||||
public boolean needsDictionary();
|
||||
|
||||
/**
|
||||
* Returns true if the end of the decompressed
|
||||
* data output stream has been reached.
|
||||
* Returns <code>true</code> if the end of the decompressed
|
||||
* data output stream has been reached. Indicates a concatenated data stream
|
||||
* when finished() returns <code>true</code> and {@link #getRemaining()}
|
||||
* returns a positive value. finished() will be reset with the
|
||||
* {@link #reset()} method.
|
||||
* @return <code>true</code> if the end of the decompressed
|
||||
* data output stream has been reached.
|
||||
*/
|
||||
@ -98,15 +101,23 @@ public interface Decompressor {
|
||||
public int decompress(byte[] b, int off, int len) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the number of bytes remaining in the compressed-data buffer;
|
||||
* typically called after the decompressor has finished decompressing
|
||||
* the current gzip stream (a.k.a. "member").
|
||||
* Returns the number of bytes remaining in the compressed data buffer.
|
||||
* Indicates a concatenated data stream if {@link #finished()} returns
|
||||
* <code>true</code> and getRemaining() returns a positive value. If
|
||||
* {@link #finished()} returns <code>true</code> and getRemaining() returns
|
||||
* a zero value, indicates that the end of data stream has been reached and
|
||||
* is not a concatenated data stream.
|
||||
* @return The number of bytes remaining in the compressed data buffer.
|
||||
*/
|
||||
public int getRemaining();
|
||||
|
||||
/**
|
||||
* Resets decompressor and input and output buffers so that a new set of
|
||||
* input data can be processed.
|
||||
* input data can be processed. If {@link #finished()}} returns
|
||||
* <code>true</code> and {@link #getRemaining()} returns a positive value,
|
||||
* reset() is called before processing of the next data stream in the
|
||||
* concatenated data stream. {@link #finished()} will be reset and will
|
||||
* return <code>false</code> when reset() is called.
|
||||
*/
|
||||
public void reset();
|
||||
|
||||
|
@ -290,6 +290,12 @@ Release 0.23.1 - UNRELEASED
|
||||
for a client on the same node as the block file. (Andrew Purtell,
|
||||
Suresh Srinivas and Jitendra Nath Pandey via szetszwo)
|
||||
|
||||
HDFS-2825. Add test hook to turn off the writer preferring its local
|
||||
DN. (todd)
|
||||
|
||||
HDFS-2826. Add test case for HDFS-1476 (safemode can initialize
|
||||
replication queues before exiting) (todd)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-2541. For a sufficiently large value of blocks, the DN Scanner
|
||||
@ -350,6 +356,9 @@ Release 0.23.1 - UNRELEASED
|
||||
HDFS-2816. Fix missing license header in httpfs findbugsExcludeFile.xml.
|
||||
(hitesh via tucu)
|
||||
|
||||
HDFS-2822. processMisReplicatedBlock incorrectly identifies
|
||||
under-construction blocks as under-replicated. (todd)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -1877,7 +1877,8 @@ private void invalidateCorruptReplicas(Block blk) {
|
||||
public void processMisReplicatedBlocks() {
|
||||
assert namesystem.hasWriteLock();
|
||||
|
||||
long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0, nrPostponed = 0;
|
||||
long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0, nrPostponed = 0,
|
||||
nrUnderConstruction = 0;
|
||||
neededReplications.clear();
|
||||
for (BlockInfo block : blocksMap.getBlocks()) {
|
||||
MisReplicationResult res = processMisReplicatedBlock(block);
|
||||
@ -1896,6 +1897,9 @@ public void processMisReplicatedBlocks() {
|
||||
nrPostponed++;
|
||||
postponeBlock(block);
|
||||
break;
|
||||
case UNDER_CONSTRUCTION:
|
||||
nrUnderConstruction++;
|
||||
break;
|
||||
case OK:
|
||||
break;
|
||||
default:
|
||||
@ -1908,6 +1912,7 @@ public void processMisReplicatedBlocks() {
|
||||
LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
|
||||
LOG.info("Number of over-replicated blocks = " + nrOverReplicated +
|
||||
((nrPostponed > 0) ? ( " (" + nrPostponed + " postponed)") : ""));
|
||||
LOG.info("Number of blocks being written = " + nrUnderConstruction);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1922,6 +1927,11 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
|
||||
addToInvalidates(block);
|
||||
return MisReplicationResult.INVALID;
|
||||
}
|
||||
if (!block.isComplete()) {
|
||||
// Incomplete blocks are never considered mis-replicated --
|
||||
// they'll be reached when they are completed or recovered.
|
||||
return MisReplicationResult.UNDER_CONSTRUCTION;
|
||||
}
|
||||
// calculate current replication
|
||||
short expectedReplication = fileINode.getReplication();
|
||||
NumberReplicas num = countNodes(block);
|
||||
@ -2797,6 +2807,8 @@ enum MisReplicationResult {
|
||||
OVER_REPLICATED,
|
||||
/** A decision can't currently be made about this block. */
|
||||
POSTPONE,
|
||||
/** The block is under construction, so should be ignored */
|
||||
UNDER_CONSTRUCTION,
|
||||
/** The block is properly replicated */
|
||||
OK
|
||||
}
|
||||
|
@ -38,6 +38,8 @@
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/** The class is responsible for choosing the desired number of targets
|
||||
* for placing block replicas.
|
||||
* The replica placement strategy is that if the writer is on a datanode,
|
||||
@ -49,6 +51,7 @@
|
||||
@InterfaceAudience.Private
|
||||
public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||
private boolean considerLoad;
|
||||
private boolean preferLocalNode = true;
|
||||
private NetworkTopology clusterMap;
|
||||
private FSClusterStats stats;
|
||||
static final String enableDebugLogging = "For more information, please enable"
|
||||
@ -223,17 +226,17 @@ private DatanodeDescriptor chooseLocalNode(
|
||||
if (localMachine == null)
|
||||
return chooseRandom(NodeBase.ROOT, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
|
||||
// otherwise try local machine first
|
||||
Node oldNode = excludedNodes.put(localMachine, localMachine);
|
||||
if (oldNode == null) { // was not in the excluded list
|
||||
if (isGoodTarget(localMachine, blocksize,
|
||||
maxNodesPerRack, false, results)) {
|
||||
results.add(localMachine);
|
||||
return localMachine;
|
||||
}
|
||||
}
|
||||
|
||||
if (preferLocalNode) {
|
||||
// otherwise try local machine first
|
||||
Node oldNode = excludedNodes.put(localMachine, localMachine);
|
||||
if (oldNode == null) { // was not in the excluded list
|
||||
if (isGoodTarget(localMachine, blocksize,
|
||||
maxNodesPerRack, false, results)) {
|
||||
results.add(localMachine);
|
||||
return localMachine;
|
||||
}
|
||||
}
|
||||
}
|
||||
// try a node on local rack
|
||||
return chooseLocalRack(localMachine, excludedNodes,
|
||||
blocksize, maxNodesPerRack, results);
|
||||
@ -568,5 +571,10 @@ public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
|
||||
}
|
||||
return cur;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setPreferLocalNode(boolean prefer) {
|
||||
this.preferLocalNode = prefer;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -196,6 +196,8 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/***************************************************
|
||||
* FSNamesystem does the actual bookkeeping work for the
|
||||
* DataNode.
|
||||
@ -3251,7 +3253,7 @@ class SafeModeInfo {
|
||||
/** Total number of blocks. */
|
||||
int blockTotal;
|
||||
/** Number of safe blocks. */
|
||||
private int blockSafe;
|
||||
int blockSafe;
|
||||
/** Number of blocks needed to satisfy safe mode threshold condition */
|
||||
private int blockThreshold;
|
||||
/** Number of blocks needed before populating replication queues */
|
||||
@ -3259,7 +3261,7 @@ class SafeModeInfo {
|
||||
/** time of the last status printout */
|
||||
private long lastStatusReport = 0;
|
||||
/** flag indicating whether replication queues have been initialized */
|
||||
private boolean initializedReplQueues = false;
|
||||
boolean initializedReplQueues = false;
|
||||
/** Was safemode entered automatically because available resources were low. */
|
||||
private boolean resourcesLow = false;
|
||||
|
||||
@ -3384,9 +3386,7 @@ private synchronized void leave(boolean checkForUpgrades) {
|
||||
*/
|
||||
private synchronized void initializeReplQueues() {
|
||||
LOG.info("initializing replication queues");
|
||||
if (isPopulatingReplQueues()) {
|
||||
LOG.warn("Replication queues already initialized.");
|
||||
}
|
||||
assert !isPopulatingReplQueues() : "Already initialized repl queues";
|
||||
long startTimeMisReplicatedScan = now();
|
||||
blockManager.processMisReplicatedBlocks();
|
||||
initializedReplQueues = true;
|
||||
@ -4975,4 +4975,9 @@ void setFsLockForTests(ReentrantReadWriteLock lock) {
|
||||
ReentrantReadWriteLock getFsLockForTests() {
|
||||
return fsLock;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public SafeModeInfo getSafeModeInfoForTests() {
|
||||
return safeMode;
|
||||
}
|
||||
}
|
||||
|
@ -19,22 +19,37 @@
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Tests to verify safe mode correctness.
|
||||
*/
|
||||
public class TestSafeMode {
|
||||
private static final Path TEST_PATH = new Path("/test");
|
||||
private static final int BLOCK_SIZE = 1024;
|
||||
Configuration conf;
|
||||
MiniDFSCluster cluster;
|
||||
FileSystem fs;
|
||||
@ -43,6 +58,7 @@ public class TestSafeMode {
|
||||
@Before
|
||||
public void startUp() throws IOException {
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
@ -83,7 +99,7 @@ public void testManualSafeMode() throws IOException {
|
||||
|
||||
// create two files with one block each.
|
||||
DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
|
||||
DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
|
||||
DFSTestUtil.createFile(fs, file2, 1000, (short)1, 0);
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
||||
@ -127,6 +143,106 @@ public void testNoExtensionIfNoBlocks() throws IOException {
|
||||
String status = cluster.getNameNode().getNamesystem().getSafemode();
|
||||
assertEquals("", status);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the NN initializes its under-replicated blocks queue
|
||||
* before it is ready to exit safemode (HDFS-1476)
|
||||
*/
|
||||
@Test(timeout=45000)
|
||||
public void testInitializeReplQueuesEarly() throws Exception {
|
||||
// Spray the blocks around the cluster when we add DNs instead of
|
||||
// concentrating all blocks on the first node.
|
||||
BlockManagerTestUtil.setWritingPrefersLocalNode(
|
||||
cluster.getNamesystem().getBlockManager(), false);
|
||||
|
||||
cluster.startDataNodes(conf, 2, true, StartupOption.REGULAR, null);
|
||||
cluster.waitActive();
|
||||
DFSTestUtil.createFile(fs, TEST_PATH, 15*BLOCK_SIZE, (short)1, 1L);
|
||||
|
||||
|
||||
List<DataNodeProperties> dnprops = Lists.newLinkedList();
|
||||
dnprops.add(cluster.stopDataNode(0));
|
||||
dnprops.add(cluster.stopDataNode(0));
|
||||
dnprops.add(cluster.stopDataNode(0));
|
||||
|
||||
cluster.getConfiguration(0).setFloat(
|
||||
DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY, 1f/15f);
|
||||
|
||||
cluster.restartNameNode();
|
||||
final NameNode nn = cluster.getNameNode();
|
||||
|
||||
String status = nn.getNamesystem().getSafemode();
|
||||
assertEquals("Safe mode is ON.The reported blocks 0 needs additional " +
|
||||
"15 blocks to reach the threshold 0.9990 of total blocks 15. " +
|
||||
"Safe mode will be turned off automatically.", status);
|
||||
assertFalse("Mis-replicated block queues should not be initialized " +
|
||||
"until threshold is crossed",
|
||||
NameNodeAdapter.safeModeInitializedReplQueues(nn));
|
||||
|
||||
cluster.restartDataNode(dnprops.remove(0));
|
||||
|
||||
// Wait for the block report from the restarted DN to come in.
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return NameNodeAdapter.getSafeModeSafeBlocks(nn) > 0;
|
||||
}
|
||||
}, 10, 10000);
|
||||
// SafeMode is fine-grain synchronized, so the processMisReplicatedBlocks
|
||||
// call is still going on at this point - wait until it's done by grabbing
|
||||
// the lock.
|
||||
nn.getNamesystem().writeLock();
|
||||
nn.getNamesystem().writeUnlock();
|
||||
int safe = NameNodeAdapter.getSafeModeSafeBlocks(nn);
|
||||
assertTrue("Expected first block report to make some but not all blocks " +
|
||||
"safe. Got: " + safe, safe >= 1 && safe < 15);
|
||||
BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
|
||||
|
||||
assertTrue(NameNodeAdapter.safeModeInitializedReplQueues(nn));
|
||||
assertEquals(15 - safe, nn.getNamesystem().getUnderReplicatedBlocks());
|
||||
|
||||
cluster.restartDataNodes();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, when under-replicated blocks are processed at the end of
|
||||
* safe-mode, blocks currently under construction are not considered
|
||||
* under-construction or missing. Regression test for HDFS-2822.
|
||||
*/
|
||||
@Test
|
||||
public void testRbwBlocksNotConsideredUnderReplicated() throws IOException {
|
||||
List<FSDataOutputStream> stms = Lists.newArrayList();
|
||||
try {
|
||||
// Create some junk blocks so that the NN doesn't just immediately
|
||||
// exit safemode on restart.
|
||||
DFSTestUtil.createFile(fs, new Path("/junk-blocks"),
|
||||
BLOCK_SIZE*4, (short)1, 1L);
|
||||
// Create several files which are left open. It's important to
|
||||
// create several here, because otherwise the first iteration of the
|
||||
// replication monitor will pull them off the replication queue and
|
||||
// hide this bug from the test!
|
||||
for (int i = 0; i < 10; i++) {
|
||||
FSDataOutputStream stm = fs.create(
|
||||
new Path("/append-" + i), true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
|
||||
stms.add(stm);
|
||||
stm.write(1);
|
||||
stm.hflush();
|
||||
}
|
||||
|
||||
cluster.restartNameNode();
|
||||
FSNamesystem ns = cluster.getNameNode(0).getNamesystem();
|
||||
BlockManagerTestUtil.updateState(ns.getBlockManager());
|
||||
assertEquals(0, ns.getPendingReplicationBlocks());
|
||||
assertEquals(0, ns.getCorruptReplicaBlocks());
|
||||
assertEquals(0, ns.getMissingBlocksCount());
|
||||
|
||||
} finally {
|
||||
for (FSDataOutputStream stm : stms) {
|
||||
IOUtils.closeStream(stm);
|
||||
}
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public interface FSRun {
|
||||
public abstract void run(FileSystem fs) throws IOException;
|
||||
@ -241,4 +357,4 @@ public void testDatanodeThreshold() throws IOException {
|
||||
assertEquals("", cluster.getNamesystem().getSafemode());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,8 @@
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.junit.Assert;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
public class BlockManagerTestUtil {
|
||||
public static void setNodeReplicationLimit(final BlockManager blockManager,
|
||||
final int limit) {
|
||||
@ -177,4 +179,17 @@ public static void noticeDeadDatanode(NameNode nn, String dnName) {
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Change whether the block placement policy will prefer the writer's
|
||||
* local Datanode or not.
|
||||
* @param prefer
|
||||
*/
|
||||
public static void setWritingPrefersLocalNode(
|
||||
BlockManager bm, boolean prefer) {
|
||||
BlockPlacementPolicy bpp = bm.getBlockPlacementPolicy();
|
||||
Preconditions.checkState(bpp instanceof BlockPlacementPolicyDefault,
|
||||
"Must use default policy, got %s", bpp.getClass());
|
||||
((BlockPlacementPolicyDefault)bpp).setPreferLocalNode(prefer);
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
@ -179,4 +180,28 @@ public static String getMkdirOpPath(FSEditLogOp op) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of blocks marked safe by safemode, or -1
|
||||
* if safemode is not running.
|
||||
*/
|
||||
public static int getSafeModeSafeBlocks(NameNode nn) {
|
||||
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests();
|
||||
if (smi == null) {
|
||||
return -1;
|
||||
}
|
||||
return smi.blockSafe;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if safemode is not running, or if safemode has already
|
||||
* initialized the replication queues
|
||||
*/
|
||||
public static boolean safeModeInitializedReplQueues(NameNode nn) {
|
||||
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests();
|
||||
if (smi == null) {
|
||||
return true;
|
||||
}
|
||||
return smi.initializedReplQueues;
|
||||
}
|
||||
}
|
||||
|
@ -528,6 +528,15 @@ Release 0.23.1 - Unreleased
|
||||
|
||||
MAPREDUCE-3705. ant build fails on 0.23 branch. (Thomas Graves via
|
||||
mahadev)
|
||||
|
||||
MAPREDUCE-3691. webservices add support to compress response.
|
||||
(Thomas Graves via mahadev)
|
||||
|
||||
MAPREDUCE-3702. internal server error trying access application master
|
||||
via proxy with filter enabled (Thomas Graves via mahadev)
|
||||
|
||||
MAPREDUCE-3646. Remove redundant URL info from "mapred job" output.
|
||||
(Jonathan Eagles via mahadev)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
|
@ -1216,6 +1216,7 @@ public JobStatus run() throws IOException, InterruptedException,
|
||||
}
|
||||
});
|
||||
state = JobState.RUNNING;
|
||||
LOG.info("The url to track the job: " + getTrackingURL());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -175,7 +175,6 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
|
||||
+ ":" + addr.getPort()));
|
||||
newUgi.addToken(clientToken);
|
||||
}
|
||||
LOG.info("The url to track the job: " + application.getTrackingUrl());
|
||||
LOG.debug("Connecting to " + serviceAddr);
|
||||
final String tempStr = serviceAddr;
|
||||
realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
|
||||
|
@ -36,6 +36,7 @@
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;
|
||||
import com.sun.jersey.api.core.ResourceConfig;
|
||||
import com.sun.jersey.core.util.FeaturesAndProperties;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
@ -160,6 +161,8 @@ public void configureServlets() {
|
||||
params.put(ResourceConfig.FEATURE_IMPLICIT_VIEWABLES, "true");
|
||||
params.put(ServletContainer.FEATURE_FILTER_FORWARD_ON_404, "true");
|
||||
params.put(FeaturesAndProperties.FEATURE_XMLROOTELEMENT_PROCESSING, "true");
|
||||
params.put(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS, GZIPContentEncodingFilter.class.getName());
|
||||
params.put(ResourceConfig.PROPERTY_CONTAINER_RESPONSE_FILTERS, GZIPContentEncodingFilter.class.getName());
|
||||
filter("/*").through(GuiceContainer.class, params);
|
||||
}
|
||||
|
||||
|
@ -57,7 +57,7 @@ public void init(FilterConfig conf) throws ServletException {
|
||||
proxyUriBase = conf.getInitParameter(PROXY_URI_BASE);
|
||||
}
|
||||
|
||||
private Set<String> getProxyAddresses() throws ServletException {
|
||||
protected Set<String> getProxyAddresses() throws ServletException {
|
||||
long now = System.currentTimeMillis();
|
||||
synchronized(this) {
|
||||
if(proxyAddresses == null || (lastUpdate + updateInterval) >= now) {
|
||||
@ -97,10 +97,13 @@ public void doFilter(ServletRequest req, ServletResponse resp,
|
||||
}
|
||||
|
||||
String user = null;
|
||||
for(Cookie c: httpReq.getCookies()) {
|
||||
if(WebAppProxyServlet.PROXY_USER_COOKIE_NAME.equals(c.getName())){
|
||||
user = c.getValue();
|
||||
break;
|
||||
|
||||
if (httpReq.getCookies() != null) {
|
||||
for(Cookie c: httpReq.getCookies()) {
|
||||
if(WebAppProxyServlet.PROXY_USER_COOKIE_NAME.equals(c.getName())){
|
||||
user = c.getValue();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if(user == null) {
|
||||
|
@ -0,0 +1,121 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.webproxy.amfilter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.servlet.Filter;
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
|
||||
public class TestAmFilter {
|
||||
|
||||
private String proxyHost = "bogushost.com";
|
||||
private String proxyUri = "http://bogus";
|
||||
|
||||
private class TestAmIpFilter extends AmIpFilter {
|
||||
|
||||
private Set<String> proxyAddresses = null;
|
||||
|
||||
protected Set<String> getProxyAddresses() {
|
||||
if(proxyAddresses == null) {
|
||||
proxyAddresses = new HashSet<String>();
|
||||
}
|
||||
proxyAddresses.add(proxyHost);
|
||||
return proxyAddresses;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class DummyFilterConfig implements FilterConfig {
|
||||
final Map<String, String> map;
|
||||
|
||||
|
||||
DummyFilterConfig(Map<String,String> map) {
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFilterName() {
|
||||
return "dummy";
|
||||
}
|
||||
@Override
|
||||
public String getInitParameter(String arg0) {
|
||||
return map.get(arg0);
|
||||
}
|
||||
@Override
|
||||
public Enumeration<String> getInitParameterNames() {
|
||||
return Collections.enumeration(map.keySet());
|
||||
}
|
||||
@Override
|
||||
public ServletContext getServletContext() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void filterNullCookies() throws Exception {
|
||||
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
|
||||
|
||||
Mockito.when(request.getCookies()).thenReturn(null);
|
||||
Mockito.when(request.getRemoteAddr()).thenReturn(proxyHost);
|
||||
|
||||
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
|
||||
|
||||
final AtomicBoolean invoked = new AtomicBoolean();
|
||||
|
||||
FilterChain chain = new FilterChain() {
|
||||
@Override
|
||||
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse)
|
||||
throws IOException, ServletException {
|
||||
invoked.set(true);
|
||||
}
|
||||
};
|
||||
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
params.put(AmIpFilter.PROXY_HOST, proxyHost);
|
||||
params.put(AmIpFilter.PROXY_URI_BASE, proxyUri);
|
||||
FilterConfig conf = new DummyFilterConfig(params);
|
||||
Filter filter = new TestAmIpFilter();
|
||||
filter.init(conf);
|
||||
filter.doFilter(request, response, chain);
|
||||
Assert.assertTrue(invoked.get());
|
||||
filter.destroy();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user