Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-09 18:07:18 -07:00
commit 0d2f6b4600
11 changed files with 69 additions and 85 deletions

View File

@ -319,9 +319,6 @@ Trunk (Unreleased)
HADOOP-10840. Fix OutOfMemoryError caused by metrics system in Azure File HADOOP-10840. Fix OutOfMemoryError caused by metrics system in Azure File
System. (Shanyu Zhao via cnauroth) System. (Shanyu Zhao via cnauroth)
HADOOP-10925. Compilation fails in native link0 function on Windows.
(cnauroth)
HADOOP-11002. shell escapes are incompatible with previous releases (aw) HADOOP-11002. shell escapes are incompatible with previous releases (aw)
HADOOP-10996. Stop violence in the *_HOME (aw) HADOOP-10996. Stop violence in the *_HOME (aw)
@ -774,6 +771,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-11071. KMSClientProvider should drain the local generated EEK cache HADOOP-11071. KMSClientProvider should drain the local generated EEK cache
on key rollover. (tucu) on key rollover. (tucu)
HADOOP-10925. Compilation fails in native link0 function on Windows.
(cnauroth)
Release 2.5.1 - UNRELEASED Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -446,8 +446,6 @@ Release 2.6.0 - UNRELEASED
HDFS-6376. Distcp data between two HA clusters requires another configuration. HDFS-6376. Distcp data between two HA clusters requires another configuration.
(Dave Marion and Haohui Mai via jing9) (Dave Marion and Haohui Mai via jing9)
HDFS-6940. Refactoring to allow ConsensusNode implementation. (shv)
HDFS-6943. Improve NN allocateBlock log to include replicas' datanode IPs. HDFS-6943. Improve NN allocateBlock log to include replicas' datanode IPs.
(Ming Ma via wheat9) (Ming Ma via wheat9)
@ -634,6 +632,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6951. Correctly persist raw namespace xattrs to edit log and fsimage. HDFS-6951. Correctly persist raw namespace xattrs to edit log and fsimage.
(clamb via wang) (clamb via wang)
HDFS-6506. Newly moved block replica been invalidated and deleted in
TestBalancer. (Binglin Chang via cnauroth)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -537,9 +537,11 @@ private ExitStatus run(int iteration, Formatter formatter,
*/ */
static int run(Collection<URI> namenodes, final Parameters p, static int run(Collection<URI> namenodes, final Parameters p,
Configuration conf) throws IOException, InterruptedException { Configuration conf) throws IOException, InterruptedException {
final long sleeptime = 2000*conf.getLong( final long sleeptime =
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
LOG.info("parameters = " + p); LOG.info("parameters = " + p);

View File

@ -164,7 +164,7 @@ public int getPendingDataNodeMessageCount() {
final BlocksMap blocksMap; final BlocksMap blocksMap;
/** Replication thread. */ /** Replication thread. */
Daemon replicationThread; final Daemon replicationThread = new Daemon(new ReplicationMonitor());
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@ -263,7 +263,6 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
this.namesystem = namesystem; this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf); datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager(); heartbeatManager = datanodeManager.getHeartbeatManager();
setReplicationMonitor(new ReplicationMonitor());
final long pendingPeriod = conf.getLong( final long pendingPeriod = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
@ -396,22 +395,6 @@ private static BlockTokenSecretManager createBlockTokenSecretManager(
} }
} }
public long getReplicationRecheckInterval() {
return replicationRecheckInterval;
}
public AtomicLong excessBlocksCount() {
return excessBlocksCount;
}
public void clearInvalidateBlocks() {
invalidateBlocks.clear();
}
void setReplicationMonitor(Runnable replicationMonitor) {
replicationThread = new Daemon(replicationMonitor);
}
public void setBlockPoolId(String blockPoolId) { public void setBlockPoolId(String blockPoolId) {
if (isBlockTokenEnabled()) { if (isBlockTokenEnabled()) {
blockTokenSecretManager.setBlockPoolId(blockPoolId); blockTokenSecretManager.setBlockPoolId(blockPoolId);
@ -1633,7 +1616,7 @@ else if (excessBlocks != null && excessBlocks.contains(block)) {
* If there were any replication requests that timed out, reap them * If there were any replication requests that timed out, reap them
* and put them back into the neededReplication queue * and put them back into the neededReplication queue
*/ */
void processPendingReplications() { private void processPendingReplications() {
Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
if (timedOutItems != null) { if (timedOutItems != null) {
namesystem.writeLock(); namesystem.writeLock();

View File

@ -1053,7 +1053,7 @@ private void refreshHostsReader(Configuration conf) throws IOException {
* 3. Added to exclude --> start decommission. * 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission. * 4. Removed from exclude --> stop decommission.
*/ */
void refreshDatanodes() { private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) { for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include. // Check if not include.
if (!hostFileManager.isIncluded(node)) { if (!hostFileManager.isIncluded(node)) {
@ -1586,9 +1586,5 @@ public void clearPendingCachingCommands() {
public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) { public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
this.shouldSendCachingCommands = shouldSendCachingCommands; this.shouldSendCachingCommands = shouldSendCachingCommands;
} }
public HostFileManager getHostFileManager() {
return this.hostFileManager;
}
} }

View File

@ -129,10 +129,6 @@ synchronized boolean hasIncludes() {
void refresh(String includeFile, String excludeFile) throws IOException { void refresh(String includeFile, String excludeFile) throws IOException {
HostSet newIncludes = readFile("included", includeFile); HostSet newIncludes = readFile("included", includeFile);
HostSet newExcludes = readFile("excluded", excludeFile); HostSet newExcludes = readFile("excluded", excludeFile);
setHosts(newIncludes, newExcludes);
}
void setHosts(HostSet newIncludes, HostSet newExcludes) {
synchronized (this) { synchronized (this) {
includes = newIncludes; includes = newIncludes;
excludes = newExcludes; excludes = newExcludes;

View File

@ -1000,7 +1000,7 @@ private List<AuditLogger> initAuditLoggers(Configuration conf) {
return Collections.unmodifiableList(auditLoggers); return Collections.unmodifiableList(auditLoggers);
} }
protected void loadFSImage(StartupOption startOpt) throws IOException { private void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage(); final FSImage fsImage = getFSImage();
// format before starting up if requested // format before starting up if requested
@ -1048,7 +1048,7 @@ protected void loadFSImage(StartupOption startOpt) throws IOException {
imageLoadComplete(); imageLoadComplete();
} }
protected void startSecretManager() { private void startSecretManager() {
if (dtSecretManager != null) { if (dtSecretManager != null) {
try { try {
dtSecretManager.startThreads(); dtSecretManager.startThreads();
@ -1060,7 +1060,7 @@ protected void startSecretManager() {
} }
} }
protected void startSecretManagerIfNecessary() { private void startSecretManagerIfNecessary() {
boolean shouldRun = shouldUseDelegationTokens() && boolean shouldRun = shouldUseDelegationTokens() &&
!isInSafeMode() && getEditLog().isOpenForWrite(); !isInSafeMode() && getEditLog().isOpenForWrite();
boolean running = dtSecretManager.isRunning(); boolean running = dtSecretManager.isRunning();
@ -1216,7 +1216,7 @@ public boolean inTransitionToActive() {
return haEnabled && inActiveState() && startingActiveService; return haEnabled && inActiveState() && startingActiveService;
} }
protected boolean shouldUseDelegationTokens() { private boolean shouldUseDelegationTokens() {
return UserGroupInformation.isSecurityEnabled() || return UserGroupInformation.isSecurityEnabled() ||
alwaysUseDelegationTokensForTests; alwaysUseDelegationTokensForTests;
} }
@ -2768,7 +2768,6 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
* @throws UnresolvedLinkException * @throws UnresolvedLinkException
* @throws IOException * @throws IOException
*/ */
protected
LocatedBlock prepareFileForWrite(String src, INodeFile file, LocatedBlock prepareFileForWrite(String src, INodeFile file,
String leaseHolder, String clientMachine, String leaseHolder, String clientMachine,
boolean writeToEditLog, boolean writeToEditLog,
@ -3225,7 +3224,6 @@ FileState analyzeFileState(String src,
return new FileState(pendingFile, src); return new FileState(pendingFile, src);
} }
protected
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs, LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
long offset) throws IOException { long offset) throws IOException {
LocatedBlock lBlk = new LocatedBlock( LocatedBlock lBlk = new LocatedBlock(
@ -3343,7 +3341,7 @@ boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
return true; return true;
} }
protected INodeFile checkLease(String src, String holder, INode inode, private INodeFile checkLease(String src, String holder, INode inode,
long fileId) long fileId)
throws LeaseExpiredException, FileNotFoundException { throws LeaseExpiredException, FileNotFoundException {
assert hasReadLock(); assert hasReadLock();
@ -4461,7 +4459,7 @@ Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
return leaseManager.reassignLease(lease, src, newHolder); return leaseManager.reassignLease(lease, src, newHolder);
} }
protected void commitOrCompleteLastBlock(final INodeFile fileINode, private void commitOrCompleteLastBlock(final INodeFile fileINode,
final Block commitBlock) throws IOException { final Block commitBlock) throws IOException {
assert hasWriteLock(); assert hasWriteLock();
Preconditions.checkArgument(fileINode.isUnderConstruction()); Preconditions.checkArgument(fileINode.isUnderConstruction());
@ -4857,7 +4855,6 @@ String getRegistrationID() {
* @return an array of datanode commands * @return an array of datanode commands
* @throws IOException * @throws IOException
*/ */
protected
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed, StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes) int xceiverCount, int xmitsInProgress, int failedVolumes)
@ -4907,7 +4904,7 @@ void checkAvailableResources() {
* @param file * @param file
* @param logRetryCache * @param logRetryCache
*/ */
protected void persistBlocks(String path, INodeFile file, private void persistBlocks(String path, INodeFile file,
boolean logRetryCache) { boolean logRetryCache) {
assert hasWriteLock(); assert hasWriteLock();
Preconditions.checkArgument(file.isUnderConstruction()); Preconditions.checkArgument(file.isUnderConstruction());
@ -5404,7 +5401,7 @@ void setBalancerBandwidth(long bandwidth) throws IOException {
* @param path * @param path
* @param file * @param file
*/ */
protected void persistNewBlock(String path, INodeFile file) { private void persistNewBlock(String path, INodeFile file) {
Preconditions.checkArgument(file.isUnderConstruction()); Preconditions.checkArgument(file.isUnderConstruction());
getEditLog().logAddBlock(path, file); getEditLog().logAddBlock(path, file);
if (NameNode.stateChangeLog.isDebugEnabled()) { if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -7282,7 +7279,7 @@ private void logReassignLease(String leaseHolder, String src,
* *
* @return true if delegation token operation is allowed * @return true if delegation token operation is allowed
*/ */
protected boolean isAllowedDelegationTokenOp() throws IOException { private boolean isAllowedDelegationTokenOp() throws IOException {
AuthenticationMethod authMethod = getConnectionAuthenticationMethod(); AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
if (UserGroupInformation.isSecurityEnabled() if (UserGroupInformation.isSecurityEnabled()
&& (authMethod != AuthenticationMethod.KERBEROS) && (authMethod != AuthenticationMethod.KERBEROS)
@ -7449,13 +7446,7 @@ public String getLiveNodes() {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>(); final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, true); blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
for (DatanodeDescriptor node : live) { for (DatanodeDescriptor node : live) {
info.put(node.getHostName(), getLiveNodeInfo(node)); Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
}
return JSON.toString(info);
}
protected Map<String, Object> getLiveNodeInfo(DatanodeDescriptor node) {
return ImmutableMap.<String, Object>builder()
.put("infoAddr", node.getInfoAddr()) .put("infoAddr", node.getInfoAddr())
.put("infoSecureAddr", node.getInfoSecureAddr()) .put("infoSecureAddr", node.getInfoSecureAddr())
.put("xferaddr", node.getXferAddr()) .put("xferaddr", node.getXferAddr())
@ -7473,6 +7464,10 @@ protected Map<String, Object> getLiveNodeInfo(DatanodeDescriptor node) {
.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent()) .put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
.put("volfails", node.getVolumeFailures()) .put("volfails", node.getVolumeFailures())
.build(); .build();
info.put(node.getHostName(), innerinfo);
}
return JSON.toString(info);
} }
/** /**
@ -7759,12 +7754,13 @@ public ReentrantLock getLongReadLockForTests() {
} }
@VisibleForTesting @VisibleForTesting
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) { public SafeModeInfo getSafeModeInfoForTests() {
this.nnResourceChecker = nnResourceChecker; return safeMode;
} }
public SafeModeInfo getSafeModeInfo() { @VisibleForTesting
return safeMode; public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
this.nnResourceChecker = nnResourceChecker;
} }
@Override @Override

View File

@ -72,7 +72,7 @@ public class TestBalancer {
((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL);
} }
final static long CAPACITY = 500L; final static long CAPACITY = 5000L;
final static String RACK0 = "/rack0"; final static String RACK0 = "/rack0";
final static String RACK1 = "/rack1"; final static String RACK1 = "/rack1";
final static String RACK2 = "/rack2"; final static String RACK2 = "/rack2";
@ -85,7 +85,7 @@ public class TestBalancer {
static final long TIMEOUT = 40000L; //msec static final long TIMEOUT = 40000L; //msec
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5% static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
static final int DEFAULT_BLOCK_SIZE = 10; static final int DEFAULT_BLOCK_SIZE = 100;
private static final Random r = new Random(); private static final Random r = new Random();
static { static {
@ -96,6 +96,7 @@ static void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
} }

View File

@ -223,7 +223,7 @@ public static FSEditLogOp createMkdirOp(String path) {
* if safemode is not running. * if safemode is not running.
*/ */
public static int getSafeModeSafeBlocks(NameNode nn) { public static int getSafeModeSafeBlocks(NameNode nn) {
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfo(); SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests();
if (smi == null) { if (smi == null) {
return -1; return -1;
} }

View File

@ -179,26 +179,8 @@ public AllocateResponse run() throws Exception {
return rm.getApplicationMasterService().allocate(request); return rm.getApplicationMasterService().allocate(request);
} }
}); });
if (response != null) {
// waiting until the AM container is allocated responseQueue.put(response);
while (true) {
if (response != null && ! response.getAllocatedContainers().isEmpty()) {
// get AM container
Container container = response.getAllocatedContainers().get(0);
se.getNmMap().get(container.getNodeId())
.addNewContainer(container, -1L);
// start AM container
amContainer = container;
LOG.debug(MessageFormat.format("Application {0} starts its " +
"AM container ({1}).", appId, amContainer.getId()));
isAMContainerRunning = true;
break;
}
// this sleep time is different from HeartBeat
Thread.sleep(1000);
// send out empty request
sendContainerRequest();
response = responseQueue.take();
} }
} }
@ -206,6 +188,26 @@ public AllocateResponse run() throws Exception {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void processResponseQueue() protected void processResponseQueue()
throws InterruptedException, YarnException, IOException { throws InterruptedException, YarnException, IOException {
// Check whether receive the am container
if (!isAMContainerRunning) {
if (!responseQueue.isEmpty()) {
AllocateResponse response = responseQueue.take();
if (response != null
&& !response.getAllocatedContainers().isEmpty()) {
// Get AM container
Container container = response.getAllocatedContainers().get(0);
se.getNmMap().get(container.getNodeId())
.addNewContainer(container, -1L);
// Start AM container
amContainer = container;
LOG.debug(MessageFormat.format("Application {0} starts its " +
"AM container ({1}).", appId, amContainer.getId()));
isAMContainerRunning = true;
}
}
return;
}
while (! responseQueue.isEmpty()) { while (! responseQueue.isEmpty()) {
AllocateResponse response = responseQueue.take(); AllocateResponse response = responseQueue.take();
@ -262,6 +264,7 @@ protected void processResponseQueue()
LOG.debug(MessageFormat.format("Application {0} sends out event " + LOG.debug(MessageFormat.format("Application {0} sends out event " +
"to clean up its AM container.", appId)); "to clean up its AM container.", appId));
isFinished = true; isFinished = true;
break;
} }
// check allocated containers // check allocated containers

View File

@ -15,6 +15,9 @@ Trunk - Unreleased
YARN-524 TestYarnVersionInfo failing if generated properties doesn't YARN-524 TestYarnVersionInfo failing if generated properties doesn't
include an SVN URL. (stevel) include an SVN URL. (stevel)
YARN-1471. The SLS simulator is not running the preemption policy
for CapacityScheduler (Carlo Curino via cdouglas)
YARN-2216 TestRMApplicationHistoryWriter sometimes fails in trunk. YARN-2216 TestRMApplicationHistoryWriter sometimes fails in trunk.
(Zhijie Shen via xgong) (Zhijie Shen via xgong)
@ -299,6 +302,9 @@ Release 2.6.0 - UNRELEASED
YARN-2519. Credential Provider related unit tests failed on Windows. YARN-2519. Credential Provider related unit tests failed on Windows.
(Xiaoyu Yao via cnauroth) (Xiaoyu Yao via cnauroth)
YARN-2526. SLS can deadlock when all the threads are taken by AMSimulators.
(Wei Yan via kasha)
Release 2.5.1 - UNRELEASED Release 2.5.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES