Revert HDFS-6940.
This commit is contained in:
parent
28d99db992
commit
05af0ff4be
@ -449,8 +449,6 @@ Release 2.6.0 - UNRELEASED
|
||||
HDFS-6376. Distcp data between two HA clusters requires another configuration.
|
||||
(Dave Marion and Haohui Mai via jing9)
|
||||
|
||||
HDFS-6940. Refactoring to allow ConsensusNode implementation. (shv)
|
||||
|
||||
HDFS-6943. Improve NN allocateBlock log to include replicas' datanode IPs.
|
||||
(Ming Ma via wheat9)
|
||||
|
||||
|
@ -164,7 +164,7 @@ public int getPendingDataNodeMessageCount() {
|
||||
final BlocksMap blocksMap;
|
||||
|
||||
/** Replication thread. */
|
||||
Daemon replicationThread;
|
||||
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
|
||||
|
||||
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
|
||||
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
|
||||
@ -263,7 +263,6 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
|
||||
this.namesystem = namesystem;
|
||||
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
||||
heartbeatManager = datanodeManager.getHeartbeatManager();
|
||||
setReplicationMonitor(new ReplicationMonitor());
|
||||
|
||||
final long pendingPeriod = conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
||||
@ -395,23 +394,7 @@ private static BlockTokenSecretManager createBlockTokenSecretManager(
|
||||
lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
|
||||
}
|
||||
}
|
||||
|
||||
public long getReplicationRecheckInterval() {
|
||||
return replicationRecheckInterval;
|
||||
}
|
||||
|
||||
public AtomicLong excessBlocksCount() {
|
||||
return excessBlocksCount;
|
||||
}
|
||||
|
||||
public void clearInvalidateBlocks() {
|
||||
invalidateBlocks.clear();
|
||||
}
|
||||
|
||||
void setReplicationMonitor(Runnable replicationMonitor) {
|
||||
replicationThread = new Daemon(replicationMonitor);
|
||||
}
|
||||
|
||||
|
||||
public void setBlockPoolId(String blockPoolId) {
|
||||
if (isBlockTokenEnabled()) {
|
||||
blockTokenSecretManager.setBlockPoolId(blockPoolId);
|
||||
@ -1633,7 +1616,7 @@ else if (excessBlocks != null && excessBlocks.contains(block)) {
|
||||
* If there were any replication requests that timed out, reap them
|
||||
* and put them back into the neededReplication queue
|
||||
*/
|
||||
void processPendingReplications() {
|
||||
private void processPendingReplications() {
|
||||
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
||||
if (timedOutItems != null) {
|
||||
namesystem.writeLock();
|
||||
|
@ -1053,7 +1053,7 @@ private void refreshHostsReader(Configuration conf) throws IOException {
|
||||
* 3. Added to exclude --> start decommission.
|
||||
* 4. Removed from exclude --> stop decommission.
|
||||
*/
|
||||
void refreshDatanodes() {
|
||||
private void refreshDatanodes() {
|
||||
for(DatanodeDescriptor node : datanodeMap.values()) {
|
||||
// Check if not include.
|
||||
if (!hostFileManager.isIncluded(node)) {
|
||||
@ -1586,9 +1586,5 @@ public void clearPendingCachingCommands() {
|
||||
public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
|
||||
this.shouldSendCachingCommands = shouldSendCachingCommands;
|
||||
}
|
||||
|
||||
public HostFileManager getHostFileManager() {
|
||||
return this.hostFileManager;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,10 +129,6 @@ synchronized boolean hasIncludes() {
|
||||
void refresh(String includeFile, String excludeFile) throws IOException {
|
||||
HostSet newIncludes = readFile("included", includeFile);
|
||||
HostSet newExcludes = readFile("excluded", excludeFile);
|
||||
setHosts(newIncludes, newExcludes);
|
||||
}
|
||||
|
||||
void setHosts(HostSet newIncludes, HostSet newExcludes) {
|
||||
synchronized (this) {
|
||||
includes = newIncludes;
|
||||
excludes = newExcludes;
|
||||
|
@ -978,7 +978,7 @@ private List<AuditLogger> initAuditLoggers(Configuration conf) {
|
||||
return Collections.unmodifiableList(auditLoggers);
|
||||
}
|
||||
|
||||
protected void loadFSImage(StartupOption startOpt) throws IOException {
|
||||
private void loadFSImage(StartupOption startOpt) throws IOException {
|
||||
final FSImage fsImage = getFSImage();
|
||||
|
||||
// format before starting up if requested
|
||||
@ -1026,7 +1026,7 @@ protected void loadFSImage(StartupOption startOpt) throws IOException {
|
||||
imageLoadComplete();
|
||||
}
|
||||
|
||||
protected void startSecretManager() {
|
||||
private void startSecretManager() {
|
||||
if (dtSecretManager != null) {
|
||||
try {
|
||||
dtSecretManager.startThreads();
|
||||
@ -1038,7 +1038,7 @@ protected void startSecretManager() {
|
||||
}
|
||||
}
|
||||
|
||||
protected void startSecretManagerIfNecessary() {
|
||||
private void startSecretManagerIfNecessary() {
|
||||
boolean shouldRun = shouldUseDelegationTokens() &&
|
||||
!isInSafeMode() && getEditLog().isOpenForWrite();
|
||||
boolean running = dtSecretManager.isRunning();
|
||||
@ -1188,7 +1188,7 @@ public boolean inTransitionToActive() {
|
||||
return haEnabled && inActiveState() && startingActiveService;
|
||||
}
|
||||
|
||||
protected boolean shouldUseDelegationTokens() {
|
||||
private boolean shouldUseDelegationTokens() {
|
||||
return UserGroupInformation.isSecurityEnabled() ||
|
||||
alwaysUseDelegationTokensForTests;
|
||||
}
|
||||
@ -2729,7 +2729,6 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
|
||||
* @throws UnresolvedLinkException
|
||||
* @throws IOException
|
||||
*/
|
||||
protected
|
||||
LocatedBlock prepareFileForWrite(String src, INodeFile file,
|
||||
String leaseHolder, String clientMachine,
|
||||
boolean writeToEditLog,
|
||||
@ -3186,7 +3185,6 @@ FileState analyzeFileState(String src,
|
||||
return new FileState(pendingFile, src);
|
||||
}
|
||||
|
||||
protected
|
||||
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
|
||||
long offset) throws IOException {
|
||||
LocatedBlock lBlk = new LocatedBlock(
|
||||
@ -3304,8 +3302,8 @@ boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
|
||||
return true;
|
||||
}
|
||||
|
||||
protected INodeFile checkLease(String src, String holder, INode inode,
|
||||
long fileId)
|
||||
private INodeFile checkLease(String src, String holder, INode inode,
|
||||
long fileId)
|
||||
throws LeaseExpiredException, FileNotFoundException {
|
||||
assert hasReadLock();
|
||||
final String ident = src + " (inode " + fileId + ")";
|
||||
@ -4422,7 +4420,7 @@ Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
|
||||
return leaseManager.reassignLease(lease, src, newHolder);
|
||||
}
|
||||
|
||||
protected void commitOrCompleteLastBlock(final INodeFile fileINode,
|
||||
private void commitOrCompleteLastBlock(final INodeFile fileINode,
|
||||
final Block commitBlock) throws IOException {
|
||||
assert hasWriteLock();
|
||||
Preconditions.checkArgument(fileINode.isUnderConstruction());
|
||||
@ -4818,7 +4816,6 @@ String getRegistrationID() {
|
||||
* @return an array of datanode commands
|
||||
* @throws IOException
|
||||
*/
|
||||
protected
|
||||
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
|
||||
StorageReport[] reports, long cacheCapacity, long cacheUsed,
|
||||
int xceiverCount, int xmitsInProgress, int failedVolumes)
|
||||
@ -4868,8 +4865,8 @@ void checkAvailableResources() {
|
||||
* @param file
|
||||
* @param logRetryCache
|
||||
*/
|
||||
protected void persistBlocks(String path, INodeFile file,
|
||||
boolean logRetryCache) {
|
||||
private void persistBlocks(String path, INodeFile file,
|
||||
boolean logRetryCache) {
|
||||
assert hasWriteLock();
|
||||
Preconditions.checkArgument(file.isUnderConstruction());
|
||||
getEditLog().logUpdateBlocks(path, file, logRetryCache);
|
||||
@ -5300,7 +5297,7 @@ void setBalancerBandwidth(long bandwidth) throws IOException {
|
||||
* @param path
|
||||
* @param file
|
||||
*/
|
||||
protected void persistNewBlock(String path, INodeFile file) {
|
||||
private void persistNewBlock(String path, INodeFile file) {
|
||||
Preconditions.checkArgument(file.isUnderConstruction());
|
||||
getEditLog().logAddBlock(path, file);
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
@ -7178,7 +7175,7 @@ private void logReassignLease(String leaseHolder, String src,
|
||||
*
|
||||
* @return true if delegation token operation is allowed
|
||||
*/
|
||||
protected boolean isAllowedDelegationTokenOp() throws IOException {
|
||||
private boolean isAllowedDelegationTokenOp() throws IOException {
|
||||
AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
|
||||
if (UserGroupInformation.isSecurityEnabled()
|
||||
&& (authMethod != AuthenticationMethod.KERBEROS)
|
||||
@ -7345,13 +7342,7 @@ public String getLiveNodes() {
|
||||
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
||||
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
|
||||
for (DatanodeDescriptor node : live) {
|
||||
info.put(node.getHostName(), getLiveNodeInfo(node));
|
||||
}
|
||||
return JSON.toString(info);
|
||||
}
|
||||
|
||||
protected Map<String, Object> getLiveNodeInfo(DatanodeDescriptor node) {
|
||||
return ImmutableMap.<String, Object>builder()
|
||||
Map<String, Object> innerinfo = ImmutableMap.<String, Object>builder()
|
||||
.put("infoAddr", node.getInfoAddr())
|
||||
.put("infoSecureAddr", node.getInfoSecureAddr())
|
||||
.put("xferaddr", node.getXferAddr())
|
||||
@ -7369,6 +7360,10 @@ protected Map<String, Object> getLiveNodeInfo(DatanodeDescriptor node) {
|
||||
.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent())
|
||||
.put("volfails", node.getVolumeFailures())
|
||||
.build();
|
||||
|
||||
info.put(node.getHostName(), innerinfo);
|
||||
}
|
||||
return JSON.toString(info);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -7653,16 +7648,17 @@ public ReentrantReadWriteLock getFsLockForTests() {
|
||||
public ReentrantLock getLongReadLockForTests() {
|
||||
return fsLock.longReadLock;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public SafeModeInfo getSafeModeInfoForTests() {
|
||||
return safeMode;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
|
||||
this.nnResourceChecker = nnResourceChecker;
|
||||
}
|
||||
|
||||
public SafeModeInfo getSafeModeInfo() {
|
||||
return safeMode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAvoidingStaleDataNodesForWrite() {
|
||||
return this.blockManager.getDatanodeManager()
|
||||
|
@ -223,7 +223,7 @@ public static FSEditLogOp createMkdirOp(String path) {
|
||||
* if safemode is not running.
|
||||
*/
|
||||
public static int getSafeModeSafeBlocks(NameNode nn) {
|
||||
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfo();
|
||||
SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests();
|
||||
if (smi == null) {
|
||||
return -1;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user