HDFS-16648. Add isDebugEnabled check for debug blockLogs in some classes (#4529)
This commit is contained in:
parent
bd0f9a46e1
commit
25ccdc77af
@ -287,11 +287,8 @@ public static void registerProtocolEngine(RPC.RpcKind rpcKind,
|
|||||||
throw new IllegalArgumentException("ReRegistration of rpcKind: " +
|
throw new IllegalArgumentException("ReRegistration of rpcKind: " +
|
||||||
rpcKind);
|
rpcKind);
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("rpcKind={}, rpcRequestWrapperClass={}, rpcInvoker={}.",
|
||||||
LOG.debug("rpcKind=" + rpcKind +
|
rpcKind, rpcRequestWrapperClass, rpcInvoker);
|
||||||
", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
|
|
||||||
", rpcInvoker=" + rpcInvoker);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Class<? extends Writable> getRpcRequestWrapper(
|
public Class<? extends Writable> getRpcRequestWrapper(
|
||||||
@ -1212,9 +1209,7 @@ public Void run() throws Exception {
|
|||||||
deltaNanos = Time.monotonicNowNanos() - startNanos;
|
deltaNanos = Time.monotonicNowNanos() - startNanos;
|
||||||
details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
|
details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Deferring response for callId: {}", this.callId);
|
||||||
LOG.debug("Deferring response for callId: " + this.callId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -1711,9 +1706,7 @@ private void doRunLoop() {
|
|||||||
// If there were some calls that have not been sent out for a
|
// If there were some calls that have not been sent out for a
|
||||||
// long time, discard them.
|
// long time, discard them.
|
||||||
//
|
//
|
||||||
if(LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Checking for old call responses.");
|
LOG.debug("Checking for old call responses.");
|
||||||
}
|
|
||||||
ArrayList<RpcCall> calls;
|
ArrayList<RpcCall> calls;
|
||||||
|
|
||||||
// get the list of channels from list of keys.
|
// get the list of channels from list of keys.
|
||||||
@ -1813,9 +1806,8 @@ private boolean processResponse(LinkedList<RpcCall> responseQueue,
|
|||||||
//
|
//
|
||||||
call = responseQueue.removeFirst();
|
call = responseQueue.removeFirst();
|
||||||
SocketChannel channel = call.connection.channel;
|
SocketChannel channel = call.connection.channel;
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
|
LOG.debug("{}: responding to {}.", Thread.currentThread().getName(), call);
|
||||||
}
|
|
||||||
//
|
//
|
||||||
// Send as much data as we can in the non-blocking fashion
|
// Send as much data as we can in the non-blocking fashion
|
||||||
//
|
//
|
||||||
@ -1832,10 +1824,8 @@ private boolean processResponse(LinkedList<RpcCall> responseQueue,
|
|||||||
} else {
|
} else {
|
||||||
done = false; // more calls pending to be sent.
|
done = false; // more calls pending to be sent.
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: responding to {} Wrote {} bytes.",
|
||||||
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
|
Thread.currentThread().getName(), call, numBytes);
|
||||||
+ " Wrote " + numBytes + " bytes.");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
//
|
//
|
||||||
// If we were unable to write the entire response out, then
|
// If we were unable to write the entire response out, then
|
||||||
@ -1860,10 +1850,8 @@ private boolean processResponse(LinkedList<RpcCall> responseQueue,
|
|||||||
decPending();
|
decPending();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: responding to {} Wrote partial {} bytes.",
|
||||||
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
|
Thread.currentThread().getName(), call, numBytes);
|
||||||
+ " Wrote partial " + numBytes + " bytes.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
error = false; // everything went off well
|
error = false; // everything went off well
|
||||||
}
|
}
|
||||||
@ -2209,13 +2197,11 @@ private void saslProcess(RpcSaslProto saslMessage)
|
|||||||
|
|
||||||
if (saslServer != null && saslServer.isComplete()) {
|
if (saslServer != null && saslServer.isComplete()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("SASL server context established. Negotiated QoP is "
|
LOG.debug("SASL server context established. Negotiated QoP is {}.",
|
||||||
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
saslServer.getNegotiatedProperty(Sasl.QOP));
|
||||||
}
|
}
|
||||||
user = getAuthorizedUgi(saslServer.getAuthorizationID());
|
user = getAuthorizedUgi(saslServer.getAuthorizationID());
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("SASL server successfully authenticated client: {}.", user);
|
||||||
LOG.debug("SASL server successfully authenticated client: " + user);
|
|
||||||
}
|
|
||||||
rpcMetrics.incrAuthenticationSuccesses();
|
rpcMetrics.incrAuthenticationSuccesses();
|
||||||
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user + " from " + toString());
|
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user + " from " + toString());
|
||||||
saslContextEstablished = true;
|
saslContextEstablished = true;
|
||||||
@ -2320,10 +2306,8 @@ private RpcSaslProto processSaslToken(RpcSaslProto saslMessage)
|
|||||||
throw new SaslException("Client did not send a token");
|
throw new SaslException("Client did not send a token");
|
||||||
}
|
}
|
||||||
byte[] saslToken = saslMessage.getToken().toByteArray();
|
byte[] saslToken = saslMessage.getToken().toByteArray();
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Have read input token of size {} for processing by saslServer.evaluateResponse()",
|
||||||
LOG.debug("Have read input token of size " + saslToken.length
|
saslToken.length);
|
||||||
+ " for processing by saslServer.evaluateResponse()");
|
|
||||||
}
|
|
||||||
saslToken = saslServer.evaluateResponse(saslToken);
|
saslToken = saslServer.evaluateResponse(saslToken);
|
||||||
return buildSaslResponse(
|
return buildSaslResponse(
|
||||||
saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
|
saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
|
||||||
@ -2338,9 +2322,8 @@ private void switchToSimple() {
|
|||||||
|
|
||||||
private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
|
private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Will send " + state + " token of size "
|
LOG.debug("Will send {} token of size {} from saslServer.", state,
|
||||||
+ ((replyToken != null) ? replyToken.length : null)
|
((replyToken != null) ? replyToken.length : null));
|
||||||
+ " from saslServer.");
|
|
||||||
}
|
}
|
||||||
RpcSaslProto.Builder response = RpcSaslProto.newBuilder();
|
RpcSaslProto.Builder response = RpcSaslProto.newBuilder();
|
||||||
response.setState(state);
|
response.setState(state);
|
||||||
@ -2664,10 +2647,8 @@ private void processConnectionContext(RpcWritable.Buffer buffer)
|
|||||||
*/
|
*/
|
||||||
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
|
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Have read input token of size {} for processing by saslServer.unwrap()",
|
||||||
LOG.debug("Have read input token of size " + inBuf.length
|
inBuf.length);
|
||||||
+ " for processing by saslServer.unwrap()");
|
|
||||||
}
|
|
||||||
inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
|
inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);
|
||||||
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
|
ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(
|
||||||
inBuf));
|
inBuf));
|
||||||
@ -2729,9 +2710,7 @@ private void processOneRpc(ByteBuffer bb)
|
|||||||
getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
|
getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
|
||||||
callId = header.getCallId();
|
callId = header.getCallId();
|
||||||
retry = header.getRetryCount();
|
retry = header.getRetryCount();
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug(" got #{}", callId);
|
||||||
LOG.debug(" got #" + callId);
|
|
||||||
}
|
|
||||||
checkRpcHeaders(header);
|
checkRpcHeaders(header);
|
||||||
|
|
||||||
if (callId < 0) { // callIds typically used during connection setup
|
if (callId < 0) { // callIds typically used during connection setup
|
||||||
@ -2746,11 +2725,8 @@ private void processOneRpc(ByteBuffer bb)
|
|||||||
} catch (RpcServerException rse) {
|
} catch (RpcServerException rse) {
|
||||||
// inform client of error, but do not rethrow else non-fatal
|
// inform client of error, but do not rethrow else non-fatal
|
||||||
// exceptions will close connection!
|
// exceptions will close connection!
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: processOneRpc from client {} threw exception [{}]",
|
||||||
LOG.debug(Thread.currentThread().getName() +
|
Thread.currentThread().getName(), this, rse);
|
||||||
": processOneRpc from client " + this +
|
|
||||||
" threw exception [" + rse + "]");
|
|
||||||
}
|
|
||||||
// use the wrapped exception if there is one.
|
// use the wrapped exception if there is one.
|
||||||
Throwable t = (rse.getCause() != null) ? rse.getCause() : rse;
|
Throwable t = (rse.getCause() != null) ? rse.getCause() : rse;
|
||||||
final RpcCall call = new RpcCall(this, callId, retry);
|
final RpcCall call = new RpcCall(this, callId, retry);
|
||||||
@ -2962,9 +2938,7 @@ private void authorizeConnection() throws RpcServerException {
|
|||||||
ProxyUsers.authorize(user, this.getHostAddress());
|
ProxyUsers.authorize(user, this.getHostAddress());
|
||||||
}
|
}
|
||||||
authorize(user, protocolName, getHostInetAddress());
|
authorize(user, protocolName, getHostInetAddress());
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Successfully authorized {}.", connectionContext);
|
||||||
LOG.debug("Successfully authorized " + connectionContext);
|
|
||||||
}
|
|
||||||
rpcMetrics.incrAuthorizationSuccesses();
|
rpcMetrics.incrAuthorizationSuccesses();
|
||||||
} catch (AuthorizationException ae) {
|
} catch (AuthorizationException ae) {
|
||||||
LOG.info("Connection from " + this
|
LOG.info("Connection from " + this
|
||||||
@ -3081,7 +3055,7 @@ public Handler(int instanceNumber) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.debug(Thread.currentThread().getName() + ": starting");
|
LOG.debug("{}: starting", Thread.currentThread().getName());
|
||||||
SERVER.set(Server.this);
|
SERVER.set(Server.this);
|
||||||
while (running) {
|
while (running) {
|
||||||
TraceScope traceScope = null;
|
TraceScope traceScope = null;
|
||||||
@ -3115,9 +3089,7 @@ public void run() {
|
|||||||
call = null;
|
call = null;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: {} for RpcKind {}.", Thread.currentThread().getName(), call, call.rpcKind);
|
||||||
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
|
|
||||||
}
|
|
||||||
CurCall.set(call);
|
CurCall.set(call);
|
||||||
if (call.span != null) {
|
if (call.span != null) {
|
||||||
traceScope = tracer.activateSpan(call.span);
|
traceScope = tracer.activateSpan(call.span);
|
||||||
@ -3152,15 +3124,14 @@ public void run() {
|
|||||||
IOUtils.cleanupWithLogger(LOG, traceScope);
|
IOUtils.cleanupWithLogger(LOG, traceScope);
|
||||||
if (call != null) {
|
if (call != null) {
|
||||||
updateMetrics(call, startTimeNanos, connDropped);
|
updateMetrics(call, startTimeNanos, connDropped);
|
||||||
ProcessingDetails.LOG.debug(
|
ProcessingDetails.LOG.debug("Served: [{}]{} name={} user={} details={}",
|
||||||
"Served: [{}]{} name={} user={} details={}",
|
|
||||||
call, (call.isResponseDeferred() ? ", deferred" : ""),
|
call, (call.isResponseDeferred() ? ", deferred" : ""),
|
||||||
call.getDetailedMetricsName(), call.getRemoteUser(),
|
call.getDetailedMetricsName(), call.getRemoteUser(),
|
||||||
call.getProcessingDetails());
|
call.getProcessingDetails());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.debug(Thread.currentThread().getName() + ": exiting");
|
LOG.debug("{}: exiting", Thread.currentThread().getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void requeueCall(Call call)
|
private void requeueCall(Call call)
|
||||||
@ -3389,14 +3360,13 @@ private List<AuthMethod> getAuthMethods(SecretManager<?> secretManager,
|
|||||||
" authentication requires a secret manager");
|
" authentication requires a secret manager");
|
||||||
}
|
}
|
||||||
} else if (secretManager != null) {
|
} else if (secretManager != null) {
|
||||||
LOG.debug(AuthenticationMethod.TOKEN +
|
LOG.debug("{} authentication enabled for secret manager", AuthenticationMethod.TOKEN);
|
||||||
" authentication enabled for secret manager");
|
|
||||||
// most preferred, go to the front of the line!
|
// most preferred, go to the front of the line!
|
||||||
authMethods.add(AuthenticationMethod.TOKEN.getAuthMethod());
|
authMethods.add(AuthenticationMethod.TOKEN.getAuthMethod());
|
||||||
}
|
}
|
||||||
authMethods.add(confAuthenticationMethod.getAuthMethod());
|
authMethods.add(confAuthenticationMethod.getAuthMethod());
|
||||||
|
|
||||||
LOG.debug("Server accepts auth methods:" + authMethods);
|
LOG.debug("Server accepts auth methods:{}", authMethods);
|
||||||
return authMethods;
|
return authMethods;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3556,9 +3526,7 @@ private void wrapWithSasl(RpcCall call) throws IOException {
|
|||||||
synchronized (call.connection.saslServer) {
|
synchronized (call.connection.saslServer) {
|
||||||
token = call.connection.saslServer.wrap(token, 0, token.length);
|
token = call.connection.saslServer.wrap(token, 0, token.length);
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled())
|
LOG.debug("Adding saslServer wrapped token of size {} as call response.", token.length);
|
||||||
LOG.debug("Adding saslServer wrapped token of size " + token.length
|
|
||||||
+ " as call response.");
|
|
||||||
// rebuild with sasl header and payload
|
// rebuild with sasl header and payload
|
||||||
RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
|
RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder()
|
||||||
.setCallId(AuthProtocol.SASL.callId)
|
.setCallId(AuthProtocol.SASL.callId)
|
||||||
@ -4004,11 +3972,8 @@ Connection register(SocketChannel channel, int ingressPort,
|
|||||||
Connection connection = new Connection(channel, Time.now(),
|
Connection connection = new Connection(channel, Time.now(),
|
||||||
ingressPort, isOnAuxiliaryPort);
|
ingressPort, isOnAuxiliaryPort);
|
||||||
add(connection);
|
add(connection);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Server connection from {}; # active connections: {}; # queued calls: {}.",
|
||||||
LOG.debug("Server connection from " + connection +
|
connection, size(), callQueue.size());
|
||||||
"; # active connections: " + size() +
|
|
||||||
"; # queued calls: " + callQueue.size());
|
|
||||||
}
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4016,9 +3981,8 @@ boolean close(Connection connection) {
|
|||||||
boolean exists = remove(connection);
|
boolean exists = remove(connection);
|
||||||
if (exists) {
|
if (exists) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(Thread.currentThread().getName() +
|
LOG.debug("{}: disconnecting client {}. Number of active connections: {}.",
|
||||||
": disconnecting client " + connection +
|
Thread.currentThread().getName(), connection, size());
|
||||||
". Number of active connections: "+ size());
|
|
||||||
}
|
}
|
||||||
// only close if actually removed to avoid double-closing due
|
// only close if actually removed to avoid double-closing due
|
||||||
// to possible races
|
// to possible races
|
||||||
@ -4080,9 +4044,7 @@ public void run() {
|
|||||||
if (!running) {
|
if (!running) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("{}: task running", Thread.currentThread().getName());
|
||||||
LOG.debug(Thread.currentThread().getName()+": task running");
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
closeIdle(false);
|
closeIdle(false);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -522,8 +522,7 @@ protected Node chooseRandom(final String scope, String excludedScope,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (numOfDatanodes <= 0) {
|
if (numOfDatanodes <= 0) {
|
||||||
LOG.debug("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\")."
|
LOG.debug("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\"). numOfDatanodes={}",
|
||||||
+ " numOfDatanodes={}",
|
|
||||||
scope, excludedScope, numOfDatanodes);
|
scope, excludedScope, numOfDatanodes);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -539,10 +538,12 @@ protected Node chooseRandom(final String scope, String excludedScope,
|
|||||||
netlock.readLock().unlock();
|
netlock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.debug("Choosing random from {} available nodes on node {},"
|
if (LOG.isDebugEnabled()) {
|
||||||
+ " scope={}, excludedScope={}, excludeNodes={}. numOfDatanodes={}.",
|
LOG.debug("Choosing random from {} available nodes on node {}, scope={},"
|
||||||
|
+ " excludedScope={}, excludeNodes={}. numOfDatanodes={}.",
|
||||||
availableNodes, innerNode, scope, excludedScope, excludedNodes,
|
availableNodes, innerNode, scope, excludedScope, excludedNodes,
|
||||||
numOfDatanodes);
|
numOfDatanodes);
|
||||||
|
}
|
||||||
Node ret = null;
|
Node ret = null;
|
||||||
if (availableNodes > 0) {
|
if (availableNodes > 0) {
|
||||||
ret = chooseRandom(innerNode, node, excludedNodes, numOfDatanodes,
|
ret = chooseRandom(innerNode, node, excludedNodes, numOfDatanodes,
|
||||||
|
@ -479,8 +479,7 @@ public void recoverUnfinalizedSegments() throws IOException {
|
|||||||
LOG.info("Successfully started new epoch " + loggers.getEpoch());
|
LOG.info("Successfully started new epoch " + loggers.getEpoch());
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
|
LOG.debug("newEpoch({}) responses:\n{}", loggers.getEpoch(), QuorumCall.mapToString(resps));
|
||||||
QuorumCall.mapToString(resps));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long mostRecentSegmentTxId = Long.MIN_VALUE;
|
long mostRecentSegmentTxId = Long.MIN_VALUE;
|
||||||
@ -518,10 +517,7 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|||||||
// the cache used for RPC calls is not enabled; fall back to using the
|
// the cache used for RPC calls is not enabled; fall back to using the
|
||||||
// streaming mechanism to serve such requests
|
// streaming mechanism to serve such requests
|
||||||
if (inProgressOk && inProgressTailingEnabled) {
|
if (inProgressOk && inProgressTailingEnabled) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Tailing edits starting from txn ID {} via RPC mechanism", fromTxnId);
|
||||||
LOG.debug("Tailing edits starting from txn ID " + fromTxnId +
|
|
||||||
" via RPC mechanism");
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
Collection<EditLogInputStream> rpcStreams = new ArrayList<>();
|
Collection<EditLogInputStream> rpcStreams = new ArrayList<>();
|
||||||
selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
|
selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
|
||||||
@ -585,8 +581,8 @@ private void selectRpcInputStreams(Collection<EditLogInputStream> streams,
|
|||||||
int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
|
int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
|
||||||
responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
|
responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
|
||||||
if (maxAllowedTxns == 0) {
|
if (maxAllowedTxns == 0) {
|
||||||
LOG.debug("No new edits available in logs; requested starting from " +
|
LOG.debug("No new edits available in logs; requested starting from ID {}",
|
||||||
"ID {}", fromTxnId);
|
fromTxnId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
|
LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
|
||||||
|
@ -1541,6 +1541,7 @@ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
|
|||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("blocks = {}", java.util.Arrays.asList(blocks));
|
LOG.debug("blocks = {}", java.util.Arrays.asList(blocks));
|
||||||
}
|
}
|
||||||
|
|
||||||
final AccessMode mode = needBlockToken? BlockTokenIdentifier.AccessMode.READ: null;
|
final AccessMode mode = needBlockToken? BlockTokenIdentifier.AccessMode.READ: null;
|
||||||
|
|
||||||
LocatedBlockBuilder locatedBlocks = providedStorageMap
|
LocatedBlockBuilder locatedBlocks = providedStorageMap
|
||||||
@ -1873,8 +1874,7 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (storage == null) {
|
if (storage == null) {
|
||||||
blockLog.debug("BLOCK* findAndMarkBlockAsCorrupt: {} not found on {}",
|
blockLog.debug("BLOCK* findAndMarkBlockAsCorrupt: {} not found on {}", blk, dn);
|
||||||
blk, dn);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
|
markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
|
||||||
@ -1893,7 +1893,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
|||||||
DatanodeStorageInfo storageInfo,
|
DatanodeStorageInfo storageInfo,
|
||||||
DatanodeDescriptor node) throws IOException {
|
DatanodeDescriptor node) throws IOException {
|
||||||
if (b.getStored().isDeleted()) {
|
if (b.getStored().isDeleted()) {
|
||||||
if(blockLog.isDebugEnabled()) {
|
if (blockLog.isDebugEnabled()) {
|
||||||
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
|
blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
|
||||||
" corrupt as it does not belong to any file", b);
|
" corrupt as it does not belong to any file", b);
|
||||||
}
|
}
|
||||||
@ -1977,7 +1977,7 @@ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
|
|||||||
|
|
||||||
// Check how many copies we have of the block
|
// Check how many copies we have of the block
|
||||||
if (nr.replicasOnStaleNodes() > 0 && !deleteCorruptReplicaImmediately) {
|
if (nr.replicasOnStaleNodes() > 0 && !deleteCorruptReplicaImmediately) {
|
||||||
if(blockLog.isDebugEnabled()) {
|
if (blockLog.isDebugEnabled()) {
|
||||||
blockLog.debug("BLOCK* invalidateBlocks: postponing " +
|
blockLog.debug("BLOCK* invalidateBlocks: postponing " +
|
||||||
"invalidation of {} on {} because {} replica(s) are located on " +
|
"invalidation of {} on {} because {} replica(s) are located on " +
|
||||||
"nodes with potentially out-of-date block reports", b, dn,
|
"nodes with potentially out-of-date block reports", b, dn,
|
||||||
@ -1990,8 +1990,7 @@ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
|
|||||||
// function and know there are enough live replicas, so we can delete it.
|
// function and know there are enough live replicas, so we can delete it.
|
||||||
addToInvalidates(b.getCorrupted(), dn);
|
addToInvalidates(b.getCorrupted(), dn);
|
||||||
removeStoredBlock(b.getStored(), node);
|
removeStoredBlock(b.getStored(), node);
|
||||||
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
|
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.", b, dn);
|
||||||
b, dn);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2156,13 +2155,11 @@ int computeReconstructionWorkForBlocks(
|
|||||||
for (DatanodeStorageInfo target : targets) {
|
for (DatanodeStorageInfo target : targets) {
|
||||||
targetList.append(' ').append(target.getDatanodeDescriptor());
|
targetList.append(' ').append(target.getDatanodeDescriptor());
|
||||||
}
|
}
|
||||||
blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNodes(),
|
blockLog.debug("BLOCK* ask {} to replicate {} to {}",
|
||||||
rw.getBlock(), targetList);
|
rw.getSrcNodes(), rw.getBlock(), targetList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
blockLog.debug("BLOCK* neededReconstruction = {} pendingReconstruction = {}",
|
||||||
blockLog.debug(
|
|
||||||
"BLOCK* neededReconstruction = {} pendingReconstruction = {}",
|
|
||||||
neededReconstruction.size(), pendingReconstruction.size());
|
neededReconstruction.size(), pendingReconstruction.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2201,7 +2198,7 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
|
|||||||
liveBlockIndices, liveBusyBlockIndices, excludeReconstructed, priority);
|
liveBlockIndices, liveBusyBlockIndices, excludeReconstructed, priority);
|
||||||
short requiredRedundancy = getExpectedLiveRedundancyNum(block,
|
short requiredRedundancy = getExpectedLiveRedundancyNum(block,
|
||||||
numReplicas);
|
numReplicas);
|
||||||
if(srcNodes == null || srcNodes.length == 0) {
|
if (srcNodes == null || srcNodes.length == 0) {
|
||||||
// block can not be reconstructed from any node
|
// block can not be reconstructed from any node
|
||||||
LOG.debug("Block {} cannot be reconstructed from any node", block);
|
LOG.debug("Block {} cannot be reconstructed from any node", block);
|
||||||
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
|
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
|
||||||
@ -2225,10 +2222,8 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
|
|||||||
int pendingNum = pendingReconstruction.getNumReplicas(block);
|
int pendingNum = pendingReconstruction.getNumReplicas(block);
|
||||||
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
|
||||||
neededReconstruction.remove(block, priority);
|
neededReconstruction.remove(block, priority);
|
||||||
if(blockLog.isDebugEnabled()) {
|
blockLog.debug("BLOCK* Removing {} from neededReconstruction as it has enough replicas",
|
||||||
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
|
block);
|
||||||
" it has enough replicas", block);
|
|
||||||
}
|
|
||||||
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
|
NameNode.getNameNodeMetrics().incNumTimesReReplicationNotScheduled();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -2328,10 +2323,8 @@ boolean validateReconstructionWork(BlockReconstructionWork rw) {
|
|||||||
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
|
||||||
neededReconstruction.remove(block, priority);
|
neededReconstruction.remove(block, priority);
|
||||||
rw.resetTargets();
|
rw.resetTargets();
|
||||||
if(blockLog.isDebugEnabled()) {
|
blockLog.debug("BLOCK* Removing {} from neededReconstruction as it has enough replicas",
|
||||||
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
|
block);
|
||||||
" it has enough replicas", block);
|
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2362,10 +2355,8 @@ boolean validateReconstructionWork(BlockReconstructionWork rw) {
|
|||||||
// The reason we use 'pending' is so we can retry
|
// The reason we use 'pending' is so we can retry
|
||||||
// reconstructions that fail after an appropriate amount of time.
|
// reconstructions that fail after an appropriate amount of time.
|
||||||
pendingReconstruction.increment(block, targets);
|
pendingReconstruction.increment(block, targets);
|
||||||
if(blockLog.isDebugEnabled()) {
|
blockLog.debug("BLOCK* block {} is moved from neededReconstruction to pendingReconstruction",
|
||||||
blockLog.debug("BLOCK* block {} is moved from neededReconstruction to "
|
block);
|
||||||
+ "pendingReconstruction", block);
|
|
||||||
}
|
|
||||||
|
|
||||||
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
|
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
|
||||||
// remove from neededReconstruction
|
// remove from neededReconstruction
|
||||||
@ -2758,9 +2749,11 @@ public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
|
|||||||
removeBlock(b);
|
removeBlock(b);
|
||||||
}
|
}
|
||||||
if (trackBlockCounts) {
|
if (trackBlockCounts) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Adjusting safe-mode totals for deletion."
|
LOG.debug("Adjusting safe-mode totals for deletion."
|
||||||
+ "decreasing safeBlocks by {}, totalBlocks by {}",
|
+ "decreasing safeBlocks by {}, totalBlocks by {}",
|
||||||
numRemovedSafe, numRemovedComplete);
|
numRemovedSafe, numRemovedComplete);
|
||||||
|
}
|
||||||
bmSafeMode.adjustBlockTotals(-numRemovedSafe, -numRemovedComplete);
|
bmSafeMode.adjustBlockTotals(-numRemovedSafe, -numRemovedComplete);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2913,15 +2906,13 @@ public boolean processReport(final DatanodeID nodeID,
|
|||||||
namesystem.writeUnlock("processReport");
|
namesystem.writeUnlock("processReport");
|
||||||
}
|
}
|
||||||
|
|
||||||
if(blockLog.isDebugEnabled()) {
|
if (blockLog.isDebugEnabled()) {
|
||||||
for (Block b : invalidatedBlocks) {
|
for (Block b : invalidatedBlocks) {
|
||||||
if(blockLog.isDebugEnabled()) {
|
|
||||||
blockLog.debug("BLOCK* processReport 0x{} with lease ID 0x{}: {} on node {} size {} " +
|
blockLog.debug("BLOCK* processReport 0x{} with lease ID 0x{}: {} on node {} size {} " +
|
||||||
"does not belong to any file.", strBlockReportId, fullBrLeaseId, b,
|
"does not belong to any file.", strBlockReportId, fullBrLeaseId, b,
|
||||||
node, b.getNumBytes());
|
node, b.getNumBytes());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Log the block report processing stats from Namenode perspective
|
// Log the block report processing stats from Namenode perspective
|
||||||
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
||||||
@ -2951,9 +2942,10 @@ public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
|
|||||||
node.setLastBlockReportTime(now());
|
node.setLastBlockReportTime(now());
|
||||||
node.setLastBlockReportMonotonic(Time.monotonicNow());
|
node.setLastBlockReportMonotonic(Time.monotonicNow());
|
||||||
}
|
}
|
||||||
LOG.debug("Processing RPC with index {} out of total {} RPCs in "
|
if (LOG.isDebugEnabled()) {
|
||||||
+ "processReport 0x{}", context.getCurRpc(),
|
LOG.debug("Processing RPC with index {} out of total {} RPCs in processReport 0x{}",
|
||||||
context.getTotalRpcs(), Long.toHexString(context.getReportId()));
|
context.getCurRpc(), context.getTotalRpcs(), Long.toHexString(context.getReportId()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock("removeBRLeaseIfNeeded");
|
namesystem.writeUnlock("removeBRLeaseIfNeeded");
|
||||||
@ -2978,14 +2970,16 @@ void rescanPostponedMisreplicatedBlocks() {
|
|||||||
|
|
||||||
BlockInfo bi = getStoredBlock(b);
|
BlockInfo bi = getStoredBlock(b);
|
||||||
if (bi == null) {
|
if (bi == null) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
|
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
|
||||||
"Postponed mis-replicated block {} no longer found " +
|
"Postponed mis-replicated block {} no longer found " +
|
||||||
"in block map.", b);
|
"in block map.", b);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
MisReplicationResult res = processMisReplicatedBlock(bi);
|
MisReplicationResult res = processMisReplicatedBlock(bi);
|
||||||
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
|
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: Re-scanned block {}, result is {}",
|
||||||
"Re-scanned block {}, result is {}", b, res);
|
b, res);
|
||||||
if (res == MisReplicationResult.POSTPONE) {
|
if (res == MisReplicationResult.POSTPONE) {
|
||||||
rescannedMisreplicatedBlocks.add(b);
|
rescannedMisreplicatedBlocks.add(b);
|
||||||
}
|
}
|
||||||
@ -3077,7 +3071,7 @@ public void markBlockReplicasAsCorrupt(Block oldBlock,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isCorrupt) {
|
if (isCorrupt) {
|
||||||
if(blockLog.isDebugEnabled()) {
|
if (blockLog.isDebugEnabled()) {
|
||||||
blockLog.debug("BLOCK* markBlockReplicasAsCorrupt: mark block replica" +
|
blockLog.debug("BLOCK* markBlockReplicasAsCorrupt: mark block replica" +
|
||||||
" {} on {} as corrupt because the dn is not in the new committed " +
|
" {} on {} as corrupt because the dn is not in the new committed " +
|
||||||
"storage list.", b, storage.getDatanodeDescriptor());
|
"storage list.", b, storage.getDatanodeDescriptor());
|
||||||
@ -3113,6 +3107,7 @@ void processFirstBlockReport(
|
|||||||
iblk.getBlockName(), storageInfo.getDatanodeDescriptor(),
|
iblk.getBlockName(), storageInfo.getDatanodeDescriptor(),
|
||||||
iblk.getNumBytes(), reportedState);
|
iblk.getNumBytes(), reportedState);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shouldPostponeBlocksFromFuture && isGenStampInFuture(iblk)) {
|
if (shouldPostponeBlocksFromFuture && isGenStampInFuture(iblk)) {
|
||||||
queueReportedBlock(storageInfo, iblk, reportedState,
|
queueReportedBlock(storageInfo, iblk, reportedState,
|
||||||
QUEUE_REASON_FUTURE_GENSTAMP);
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||||
@ -3329,9 +3324,11 @@ private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
|
|||||||
ReplicaState reportedState, String reason) {
|
ReplicaState reportedState, String reason) {
|
||||||
assert shouldPostponeBlocksFromFuture;
|
assert shouldPostponeBlocksFromFuture;
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Queueing reported block {} in state {}" +
|
LOG.debug("Queueing reported block {} in state {}" +
|
||||||
" from datanode {} for later processing because {}.",
|
" from datanode {} for later processing because {}.",
|
||||||
block, reportedState, storageInfo.getDatanodeDescriptor(), reason);
|
block, reportedState, storageInfo.getDatanodeDescriptor(), reason);
|
||||||
|
}
|
||||||
pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
|
pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3598,11 +3595,8 @@ private Block addStoredBlock(final BlockInfo block,
|
|||||||
}
|
}
|
||||||
if (storedBlock == null || storedBlock.isDeleted()) {
|
if (storedBlock == null || storedBlock.isDeleted()) {
|
||||||
// If this block does not belong to anyfile, then we are done.
|
// If this block does not belong to anyfile, then we are done.
|
||||||
if(blockLog.isDebugEnabled()) {
|
blockLog.debug("BLOCK* addStoredBlock: {} on {} size {} but it does not belong to any file",
|
||||||
blockLog.debug("BLOCK* addStoredBlock: {} on {} size {} but it does not" +
|
block, node, block.getNumBytes());
|
||||||
" belong to any file", block, node, block.getNumBytes());
|
|
||||||
}
|
|
||||||
|
|
||||||
// we could add this block to invalidate set of this datanode.
|
// we could add this block to invalidate set of this datanode.
|
||||||
// it will happen in next block report otherwise.
|
// it will happen in next block report otherwise.
|
||||||
return block;
|
return block;
|
||||||
@ -3630,7 +3624,7 @@ private Block addStoredBlock(final BlockInfo block,
|
|||||||
corruptReplicas.removeFromCorruptReplicasMap(block, node,
|
corruptReplicas.removeFromCorruptReplicasMap(block, node,
|
||||||
Reason.GENSTAMP_MISMATCH);
|
Reason.GENSTAMP_MISMATCH);
|
||||||
curReplicaDelta = 0;
|
curReplicaDelta = 0;
|
||||||
if(blockLog.isDebugEnabled()) {
|
if (blockLog.isDebugEnabled()) {
|
||||||
blockLog.debug("BLOCK* addStoredBlock: Redundant addStoredBlock request"
|
blockLog.debug("BLOCK* addStoredBlock: Redundant addStoredBlock request"
|
||||||
+ " received for {} on node {} size {}", storedBlock, node,
|
+ " received for {} on node {} size {}", storedBlock, node,
|
||||||
storedBlock.getNumBytes());
|
storedBlock.getNumBytes());
|
||||||
@ -3735,10 +3729,8 @@ private void invalidateCorruptReplicas(BlockInfo blk, Block reported,
|
|||||||
removedFromBlocksMap = false;
|
removedFromBlocksMap = false;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if(blockLog.isDebugEnabled()) {
|
blockLog.debug("invalidateCorruptReplicas error in deleting bad block {} on {}",
|
||||||
blockLog.debug("invalidateCorruptReplicas error in deleting bad block"
|
blk, node, e);
|
||||||
+ " {} on {}", blk, node, e);
|
|
||||||
}
|
|
||||||
removedFromBlocksMap = false;
|
removedFromBlocksMap = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3920,8 +3912,8 @@ public int processMisReplicatedBlocks(List<BlockInfo> blocks) {
|
|||||||
BlockInfo blk = iter.next();
|
BlockInfo blk = iter.next();
|
||||||
MisReplicationResult r = processMisReplicatedBlock(blk);
|
MisReplicationResult r = processMisReplicatedBlock(blk);
|
||||||
processed++;
|
processed++;
|
||||||
LOG.debug("BLOCK* processMisReplicatedBlocks: " +
|
LOG.debug("BLOCK* processMisReplicatedBlocks: Re-scanned block {}, result is {}",
|
||||||
"Re-scanned block {}, result is {}", blk, r);
|
blk, r);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock("processMisReplicatedBlocks");
|
namesystem.writeUnlock("processMisReplicatedBlocks");
|
||||||
@ -4187,10 +4179,8 @@ private void processChosenExcessRedundancy(
|
|||||||
//
|
//
|
||||||
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
|
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
|
||||||
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
|
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
|
||||||
if(blockLog.isDebugEnabled()) {
|
blockLog.debug("BLOCK* chooseExcessRedundancies: ({}, {}) is added to invalidated blocks set",
|
||||||
blockLog.debug("BLOCK* chooseExcessRedundancies: "
|
chosen, storedBlock);
|
||||||
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeStoredBlock(DatanodeStorageInfo storageInfo, Block block,
|
private void removeStoredBlock(DatanodeStorageInfo storageInfo, Block block,
|
||||||
@ -4212,8 +4202,8 @@ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
|
|||||||
assert (namesystem.hasWriteLock());
|
assert (namesystem.hasWriteLock());
|
||||||
{
|
{
|
||||||
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
|
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
|
||||||
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
|
blockLog.debug("BLOCK* removeStoredBlock: {} has already been removed from node {}",
|
||||||
" removed from node {}", storedBlock, node);
|
storedBlock, node);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4225,10 +4215,12 @@ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
|
|||||||
removed |= node.getCached().remove(cblock);
|
removed |= node.getCached().remove(cblock);
|
||||||
removed |= node.getPendingUncached().remove(cblock);
|
removed |= node.getPendingUncached().remove(cblock);
|
||||||
if (removed) {
|
if (removed) {
|
||||||
|
if (blockLog.isDebugEnabled()) {
|
||||||
blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
|
blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
|
||||||
+ "related lists on node {}", storedBlock, node);
|
+ "related lists on node {}", storedBlock, node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// It's possible that the block was removed because of a datanode
|
// It's possible that the block was removed because of a datanode
|
||||||
@ -4251,8 +4243,9 @@ private void removeStaleReplicas(List<ReplicaUnderConstruction> staleReplicas,
|
|||||||
for (ReplicaUnderConstruction r : staleReplicas) {
|
for (ReplicaUnderConstruction r : staleReplicas) {
|
||||||
removeStoredBlock(block,
|
removeStoredBlock(block,
|
||||||
r.getExpectedStorageLocation().getDatanodeDescriptor());
|
r.getExpectedStorageLocation().getDatanodeDescriptor());
|
||||||
blockLog.debug("BLOCK* Removing stale replica {} of {}", r,
|
if (blockLog.isDebugEnabled()) {
|
||||||
Block.toString(r));
|
blockLog.debug("BLOCK* Removing stale replica {} of {}", r, Block.toString(r));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -4380,10 +4373,8 @@ private boolean processAndHandleReportedBlock(
|
|||||||
maxNumBlocksToLog, numBlocksLogged);
|
maxNumBlocksToLog, numBlocksLogged);
|
||||||
}
|
}
|
||||||
for (Block b : toInvalidate) {
|
for (Block b : toInvalidate) {
|
||||||
if(blockLog.isDebugEnabled()) {
|
blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not belong to any file",
|
||||||
blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
|
b, node, b.getNumBytes());
|
||||||
"belong to any file", b, node, b.getNumBytes());
|
|
||||||
}
|
|
||||||
addToInvalidates(b, node);
|
addToInvalidates(b, node);
|
||||||
}
|
}
|
||||||
for (BlockToMarkCorrupt b : toCorrupt) {
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
||||||
@ -4464,7 +4455,7 @@ private void processIncrementalBlockReport(final DatanodeDescriptor node,
|
|||||||
blockLog.debug("BLOCK* block {}: {} is received from {}",
|
blockLog.debug("BLOCK* block {}: {} is received from {}",
|
||||||
rdbi.getStatus(), rdbi.getBlock(), node);
|
rdbi.getStatus(), rdbi.getBlock(), node);
|
||||||
}
|
}
|
||||||
if(blockLog.isDebugEnabled()) {
|
if (blockLog.isDebugEnabled()) {
|
||||||
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
|
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
|
||||||
+ "{} receiving: {}, received: {}, deleted: {}", node, receiving,
|
+ "{} receiving: {}, received: {}, deleted: {}", node, receiving,
|
||||||
received, deleted);
|
received, deleted);
|
||||||
@ -4843,8 +4834,10 @@ private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
|||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock("invalidateWorkForOneNode");
|
namesystem.writeUnlock("invalidateWorkForOneNode");
|
||||||
}
|
}
|
||||||
blockLog.debug("BLOCK* {}: ask {} to delete {}", getClass().getSimpleName(),
|
if (blockLog.isDebugEnabled()) {
|
||||||
dn, toInvalidate);
|
blockLog.debug("BLOCK* {}: ask {} to delete {}",
|
||||||
|
getClass().getSimpleName(), dn, toInvalidate);
|
||||||
|
}
|
||||||
return toInvalidate.size();
|
return toInvalidate.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -5110,8 +5103,8 @@ public void run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isSleep) {
|
if (isSleep) {
|
||||||
LOG.debug("Clear markedDeleteQueue over {}" +
|
LOG.debug("Clear markedDeleteQueue over {} millisecond to release the write lock",
|
||||||
" millisecond to release the write lock", deleteBlockLockTimeMs);
|
deleteBlockLockTimeMs);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(deleteBlockUnlockIntervalTimeMs);
|
Thread.sleep(deleteBlockUnlockIntervalTimeMs);
|
||||||
|
@ -238,8 +238,8 @@ DatanodeStorageInfo[] chooseTarget(String src,
|
|||||||
return getPipeline(writer,
|
return getPipeline(writer,
|
||||||
results.toArray(new DatanodeStorageInfo[results.size()]));
|
results.toArray(new DatanodeStorageInfo[results.size()]));
|
||||||
} catch (NotEnoughReplicasException nr) {
|
} catch (NotEnoughReplicasException nr) {
|
||||||
LOG.debug("Failed to choose with favored nodes (={}), disregard favored"
|
LOG.debug("Failed to choose with favored nodes (={}), disregard favored nodes hint and retry",
|
||||||
+ " nodes hint and retry.", favoredNodes, nr);
|
favoredNodes, nr);
|
||||||
// Fall back to regular block placement disregarding favored nodes hint
|
// Fall back to regular block placement disregarding favored nodes hint
|
||||||
return chooseTarget(src, numOfReplicas, writer,
|
return chooseTarget(src, numOfReplicas, writer,
|
||||||
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
|
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
|
||||||
@ -715,17 +715,19 @@ protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
|
|||||||
DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor();
|
DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor();
|
||||||
if (nextNode != localMachine) {
|
if (nextNode != localMachine) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Failed to choose from local rack (location = " + localRack
|
LOG.debug("Failed to choose from local rack (location = {}), retry with the rack "
|
||||||
+ "), retry with the rack of the next replica (location = "
|
+ "of the next replica (location = {})", localRack,
|
||||||
+ nextNode.getNetworkLocation() + ")", e);
|
nextNode.getNetworkLocation(), e);
|
||||||
}
|
}
|
||||||
return chooseFromNextRack(nextNode, excludedNodes, blocksize,
|
return chooseFromNextRack(nextNode, excludedNodes, blocksize,
|
||||||
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Failed to choose from local rack (location = {}); the second"
|
LOG.debug("Failed to choose from local rack (location = {}); the second"
|
||||||
+ " replica is not found, retry choosing randomly", localRack, e);
|
+ " replica is not found, retry choosing randomly", localRack, e);
|
||||||
|
}
|
||||||
|
|
||||||
//the second replica is not found, randomly choose one from the network
|
//the second replica is not found, randomly choose one from the network
|
||||||
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
||||||
@ -745,8 +747,9 @@ private DatanodeStorageInfo chooseFromNextRack(Node next,
|
|||||||
return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack,
|
return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack,
|
||||||
results, avoidStaleNodes, storageTypes);
|
results, avoidStaleNodes, storageTypes);
|
||||||
} catch (NotEnoughReplicasException e) {
|
} catch (NotEnoughReplicasException e) {
|
||||||
LOG.debug("Failed to choose from the next rack (location = {}), "
|
LOG.debug("Failed to choose from the next rack (location = {}), retry choosing randomly",
|
||||||
+ "retry choosing randomly", nextRack, e);
|
nextRack, e);
|
||||||
|
|
||||||
// otherwise randomly choose one from the network
|
// otherwise randomly choose one from the network
|
||||||
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
|
||||||
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
||||||
@ -775,10 +778,8 @@ protected void chooseRemoteRack(int numOfReplicas,
|
|||||||
excludedNodes, blocksize, maxReplicasPerRack, results,
|
excludedNodes, blocksize, maxReplicasPerRack, results,
|
||||||
avoidStaleNodes, storageTypes);
|
avoidStaleNodes, storageTypes);
|
||||||
} catch (NotEnoughReplicasException e) {
|
} catch (NotEnoughReplicasException e) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Failed to choose remote rack (location = ~{}), fallback to local rack",
|
||||||
LOG.debug("Failed to choose remote rack (location = ~"
|
localMachine.getNetworkLocation(), e);
|
||||||
+ localMachine.getNetworkLocation() + "), fallback to local rack", e);
|
|
||||||
}
|
|
||||||
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
|
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
|
||||||
localMachine.getNetworkLocation(), excludedNodes, blocksize,
|
localMachine.getNetworkLocation(), excludedNodes, blocksize,
|
||||||
maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
|
maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
|
||||||
@ -1276,8 +1277,7 @@ public List<DatanodeStorageInfo> chooseReplicasToDelete(
|
|||||||
firstOne = false;
|
firstOne = false;
|
||||||
if (cur == null) {
|
if (cur == null) {
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"No excess replica can be found. excessTypes: {}. "
|
"No excess replica can be found. excessTypes: {}. moreThanOne: {}. exactlyOne: {}.",
|
||||||
+ "moreThanOne: {}. exactlyOne: {}.",
|
|
||||||
excessTypes, moreThanOne, exactlyOne);
|
excessTypes, moreThanOne, exactlyOne);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -152,11 +152,13 @@ void activate(Configuration conf) {
|
|||||||
executor.scheduleWithFixedDelay(monitor, intervalSecs, intervalSecs,
|
executor.scheduleWithFixedDelay(monitor, intervalSecs, intervalSecs,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Activating DatanodeAdminManager with interval {} seconds, " +
|
LOG.debug("Activating DatanodeAdminManager with interval {} seconds, " +
|
||||||
"{} max blocks per interval, " +
|
"{} max blocks per interval, " +
|
||||||
"{} max concurrently tracked nodes.", intervalSecs,
|
"{} max concurrently tracked nodes.", intervalSecs,
|
||||||
blocksPerInterval, maxConcurrentTrackedNodes);
|
blocksPerInterval, maxConcurrentTrackedNodes);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the admin monitor thread, waiting briefly for it to terminate.
|
* Stop the admin monitor thread, waiting briefly for it to terminate.
|
||||||
|
@ -842,9 +842,7 @@ private void removeDatanode(DatanodeDescriptor nodeInfo,
|
|||||||
decrementVersionCount(nodeInfo.getSoftwareVersion());
|
decrementVersionCount(nodeInfo.getSoftwareVersion());
|
||||||
blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
|
blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("remove datanode {}.", nodeInfo);
|
||||||
LOG.debug("remove datanode " + nodeInfo);
|
|
||||||
}
|
|
||||||
blockManager.checkSafeMode();
|
blockManager.checkSafeMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -906,8 +904,8 @@ void addDatanode(final DatanodeDescriptor node) {
|
|||||||
resolveUpgradeDomain(node);
|
resolveUpgradeDomain(node);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
|
LOG.debug("{}.addDatanode: node {} is added to datanodeMap.",
|
||||||
+ "node " + node + " is added to datanodeMap.");
|
getClass().getSimpleName(), node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -918,9 +916,8 @@ private void wipeDatanode(final DatanodeID node) {
|
|||||||
host2DatanodeMap.remove(datanodeMap.remove(key));
|
host2DatanodeMap.remove(datanodeMap.remove(key));
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
|
LOG.debug("{}.wipeDatanode({}): storage {} is removed from datanodeMap.",
|
||||||
+ node + "): storage " + key
|
getClass().getSimpleName(), node, key);
|
||||||
+ " is removed from datanodeMap.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1189,10 +1186,7 @@ public void registerDatanode(DatanodeRegistration nodeReg)
|
|||||||
// The same datanode has been just restarted to serve the same data
|
// The same datanode has been just restarted to serve the same data
|
||||||
// storage. We do not need to remove old data blocks, the delta will
|
// storage. We do not need to remove old data blocks, the delta will
|
||||||
// be calculated on the next block report from the datanode
|
// be calculated on the next block report from the datanode
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
NameNode.stateChangeLog.debug("BLOCK* registerDatanode: node restarted.");
|
||||||
NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
|
|
||||||
+ "node restarted.");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// nodeS is found
|
// nodeS is found
|
||||||
/* The registering datanode is a replacement node for the existing
|
/* The registering datanode is a replacement node for the existing
|
||||||
@ -1535,10 +1529,12 @@ void checkIfClusterIsNowMultiRack(DatanodeDescriptor node) {
|
|||||||
"now be replicated cross-rack";
|
"now be replicated cross-rack";
|
||||||
LOG.info(message);
|
LOG.info(message);
|
||||||
} else {
|
} else {
|
||||||
message += "Not checking for mis-replicated blocks because this NN is " +
|
if (LOG.isDebugEnabled()) {
|
||||||
"not yet processing repl queues.";
|
message += "Not checking for mis-replicated blocks because this NN "
|
||||||
|
+ "is not yet processing repl queues.";
|
||||||
LOG.debug(message);
|
LOG.debug(message);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
hasClusterEverBeenMultiRack = true;
|
hasClusterEverBeenMultiRack = true;
|
||||||
if (blockManager.isPopulatingReplQueues()) {
|
if (blockManager.isPopulatingReplQueues()) {
|
||||||
blockManager.processMisReplicatedBlocks();
|
blockManager.processMisReplicatedBlocks();
|
||||||
@ -1659,11 +1655,9 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("getDatanodeListForReport with " +
|
LOG.debug("getDatanodeListForReport with includedNodes = {}, excludedNodes = {}"
|
||||||
"includedNodes = " + hostConfigManager.getIncludes() +
|
+ ", foundNodes = {}, nodes = {}.", hostConfigManager.getIncludes(),
|
||||||
", excludedNodes = " + hostConfigManager.getExcludes() +
|
hostConfigManager.getExcludes(), foundNodes, nodes);
|
||||||
", foundNodes = " + foundNodes +
|
|
||||||
", nodes = " + nodes);
|
|
||||||
}
|
}
|
||||||
return nodes;
|
return nodes;
|
||||||
}
|
}
|
||||||
@ -1847,10 +1841,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
(double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
|
(double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
|
||||||
int numECTasks = (int) Math.ceil(
|
int numECTasks = (int) Math.ceil(
|
||||||
(double) (totalECBlocks * maxTransfers) / totalBlocks);
|
(double) (totalECBlocks * maxTransfers) / totalBlocks);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Pending replication tasks: {} erasure-coded tasks: {}.",
|
||||||
LOG.debug("Pending replication tasks: " + numReplicationTasks
|
numReplicationTasks, numECTasks);
|
||||||
+ " erasure-coded tasks: " + numECTasks);
|
|
||||||
}
|
|
||||||
// check pending replication tasks
|
// check pending replication tasks
|
||||||
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
|
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
|
||||||
numReplicationTasks);
|
numReplicationTasks);
|
||||||
@ -1906,9 +1898,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
|
if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
|
||||||
final Map<String, OutlierMetrics> slowPeersMap = slowPeers.getSlowPeers();
|
final Map<String, OutlierMetrics> slowPeersMap = slowPeers.getSlowPeers();
|
||||||
if (!slowPeersMap.isEmpty()) {
|
if (!slowPeersMap.isEmpty()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("DataNode {} reported slow peers: {}.", nodeReg, slowPeersMap);
|
||||||
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
|
|
||||||
}
|
|
||||||
for (Map.Entry<String, OutlierMetrics> slowNodeEntry : slowPeersMap.entrySet()) {
|
for (Map.Entry<String, OutlierMetrics> slowNodeEntry : slowPeersMap.entrySet()) {
|
||||||
slowPeerTracker.addReport(slowNodeEntry.getKey(), nodeReg.getIpcAddr(false),
|
slowPeerTracker.addReport(slowNodeEntry.getKey(), nodeReg.getIpcAddr(false),
|
||||||
slowNodeEntry.getValue());
|
slowNodeEntry.getValue());
|
||||||
@ -1918,10 +1908,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
|||||||
|
|
||||||
if (slowDiskTracker != null) {
|
if (slowDiskTracker != null) {
|
||||||
if (!slowDisks.getSlowDisks().isEmpty()) {
|
if (!slowDisks.getSlowDisks().isEmpty()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("DataNode {} reported slow disks: {}.", nodeReg, slowDisks.getSlowDisks());
|
||||||
LOG.debug("DataNode " + nodeReg + " reported slow disks: " +
|
|
||||||
slowDisks.getSlowDisks());
|
|
||||||
}
|
|
||||||
slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);
|
slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);
|
||||||
}
|
}
|
||||||
slowDiskTracker.checkAndUpdateReportIfNecessary();
|
slowDiskTracker.checkAndUpdateReportIfNecessary();
|
||||||
@ -1950,9 +1937,7 @@ public void handleLifeline(DatanodeRegistration nodeReg,
|
|||||||
StorageReport[] reports, long cacheCapacity,
|
StorageReport[] reports, long cacheCapacity,
|
||||||
long cacheUsed, int xceiverCount, int failedVolumes,
|
long cacheUsed, int xceiverCount, int failedVolumes,
|
||||||
VolumeFailureSummary volumeFailureSummary) throws IOException {
|
VolumeFailureSummary volumeFailureSummary) throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Received handleLifeline from nodeReg = {}.", nodeReg);
|
||||||
LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
|
|
||||||
}
|
|
||||||
DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
|
DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
|
||||||
if (nodeinfo == null || !nodeinfo.isRegistered()) {
|
if (nodeinfo == null || !nodeinfo.isRegistered()) {
|
||||||
// This can happen if the lifeline message comes when DataNode is either
|
// This can happen if the lifeline message comes when DataNode is either
|
||||||
|
@ -976,9 +976,8 @@ private void setCachedLocations(LocatedBlock block) {
|
|||||||
public final void processCacheReport(final DatanodeID datanodeID,
|
public final void processCacheReport(final DatanodeID datanodeID,
|
||||||
final List<Long> blockIds) throws IOException {
|
final List<Long> blockIds) throws IOException {
|
||||||
if (!enabled) {
|
if (!enabled) {
|
||||||
LOG.debug("Ignoring cache report from {} because {} = false. " +
|
LOG.debug("Ignoring cache report from {} because {} = false. number of blocks: {}",
|
||||||
"number of blocks: {}", datanodeID,
|
datanodeID, DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size());
|
||||||
DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size());
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
@ -1003,9 +1002,8 @@ public final void processCacheReport(final DatanodeID datanodeID,
|
|||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.addCacheBlockReport((int) (endTime - startTime));
|
metrics.addCacheBlockReport((int) (endTime - startTime));
|
||||||
}
|
}
|
||||||
LOG.debug("Processed cache report from {}, blocks: {}, " +
|
LOG.debug("Processed cache report from {}, blocks: {}, processing time: {} msecs",
|
||||||
"processing time: {} msecs", datanodeID, blockIds.size(),
|
datanodeID, blockIds.size(), (endTime - startTime));
|
||||||
(endTime - startTime));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processCacheReportImpl(final DatanodeDescriptor datanode,
|
private void processCacheReportImpl(final DatanodeDescriptor datanode,
|
||||||
|
@ -649,7 +649,7 @@ void openEditLogForWrite(int layoutVersion) throws IOException {
|
|||||||
*/
|
*/
|
||||||
void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
|
void reloadFromImageFile(File file, FSNamesystem target) throws IOException {
|
||||||
target.clear();
|
target.clear();
|
||||||
LOG.debug("Reloading namespace from " + file);
|
LOG.debug("Reloading namespace from {}.", file);
|
||||||
loadFSImage(file, target, null, false);
|
loadFSImage(file, target, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -728,7 +728,7 @@ LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (EditLogInputStream l : editStreams) {
|
for (EditLogInputStream l : editStreams) {
|
||||||
LOG.debug("Planning to load edit log stream: " + l);
|
LOG.debug("Planning to load edit log stream: {}.", l);
|
||||||
}
|
}
|
||||||
if (!editStreams.iterator().hasNext()) {
|
if (!editStreams.iterator().hasNext()) {
|
||||||
LOG.info("No edit log streams selected.");
|
LOG.info("No edit log streams selected.");
|
||||||
@ -892,7 +892,9 @@ public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
|||||||
FSNamesystem target, long maxTxnsToRead,
|
FSNamesystem target, long maxTxnsToRead,
|
||||||
StartupOption startOpt, MetaRecoveryContext recovery)
|
StartupOption startOpt, MetaRecoveryContext recovery)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("About to load edits:\n {}.", Joiner.on("\n ").join(editStreams));
|
||||||
|
}
|
||||||
|
|
||||||
long prevLastAppliedTxId = lastAppliedTxId;
|
long prevLastAppliedTxId = lastAppliedTxId;
|
||||||
long remainingReadTxns = maxTxnsToRead;
|
long remainingReadTxns = maxTxnsToRead;
|
||||||
@ -1348,10 +1350,10 @@ private void renameImageFileInDir(StorageDirectory sd, NameNodeFile fromNnf,
|
|||||||
final File fromFile = NNStorage.getStorageFile(sd, fromNnf, txid);
|
final File fromFile = NNStorage.getStorageFile(sd, fromNnf, txid);
|
||||||
final File toFile = NNStorage.getStorageFile(sd, toNnf, txid);
|
final File toFile = NNStorage.getStorageFile(sd, toNnf, txid);
|
||||||
// renameTo fails on Windows if the destination file already exists.
|
// renameTo fails on Windows if the destination file already exists.
|
||||||
if(LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("renaming " + fromFile.getAbsolutePath()
|
LOG.debug("renaming {} to {}", fromFile.getAbsoluteFile(), toFile.getAbsolutePath());
|
||||||
+ " to " + toFile.getAbsolutePath());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fromFile.renameTo(toFile)) {
|
if (!fromFile.renameTo(toFile)) {
|
||||||
if (!toFile.delete() || !fromFile.renameTo(toFile)) {
|
if (!toFile.delete() || !fromFile.renameTo(toFile)) {
|
||||||
throw new IOException("renaming " + fromFile.getAbsolutePath() + " to " +
|
throw new IOException("renaming " + fromFile.getAbsolutePath() + " to " +
|
||||||
|
@ -1407,8 +1407,8 @@ void startActiveServices() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("NameNode metadata after re-processing " +
|
LOG.debug("NameNode metadata after re-processing {}"
|
||||||
"replication and invalidation queues during failover:\n" +
|
+ "replication and invalidation queues during failover:\n",
|
||||||
metaSaveAsString());
|
metaSaveAsString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2675,8 +2675,8 @@ CryptoProtocolVersion chooseProtocolVersion(
|
|||||||
|
|
||||||
for (CryptoProtocolVersion c : supportedVersions) {
|
for (CryptoProtocolVersion c : supportedVersions) {
|
||||||
if (c.equals(CryptoProtocolVersion.UNKNOWN)) {
|
if (c.equals(CryptoProtocolVersion.UNKNOWN)) {
|
||||||
LOG.debug("Ignoring unknown CryptoProtocolVersion provided by " +
|
LOG.debug("Ignoring unknown CryptoProtocolVersion provided by client: {}",
|
||||||
"client: {}", c.getUnknownValue());
|
c.getUnknownValue());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (c.equals(required)) {
|
if (c.equals(required)) {
|
||||||
@ -2987,8 +2987,7 @@ LastBlockWithStatus appendFile(String srcArg, String holder,
|
|||||||
requireEffectiveLayoutVersionForFeature(Feature.APPEND_NEW_BLOCK);
|
requireEffectiveLayoutVersionForFeature(Feature.APPEND_NEW_BLOCK);
|
||||||
}
|
}
|
||||||
|
|
||||||
NameNode.stateChangeLog.debug(
|
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}",
|
||||||
"DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}",
|
|
||||||
srcArg, holder, clientMachine);
|
srcArg, holder, clientMachine);
|
||||||
try {
|
try {
|
||||||
boolean skipSync = false;
|
boolean skipSync = false;
|
||||||
@ -3045,8 +3044,8 @@ LocatedBlock getAdditionalBlock(
|
|||||||
DatanodeInfo[] excludedNodes, String[] favoredNodes,
|
DatanodeInfo[] excludedNodes, String[] favoredNodes,
|
||||||
EnumSet<AddBlockFlag> flags) throws IOException {
|
EnumSet<AddBlockFlag> flags) throws IOException {
|
||||||
final String operationName = "getAdditionalBlock";
|
final String operationName = "getAdditionalBlock";
|
||||||
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +
|
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {} for {}",
|
||||||
" for {}", src, fileId, clientName);
|
src, fileId, clientName);
|
||||||
|
|
||||||
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
||||||
FSDirWriteFileOp.ValidateAddBlockResult r;
|
FSDirWriteFileOp.ValidateAddBlockResult r;
|
||||||
@ -3148,8 +3147,7 @@ LocatedBlock getAdditionalDatanode(String src, long fileId,
|
|||||||
*/
|
*/
|
||||||
void abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
|
void abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
NameNode.stateChangeLog.debug(
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: {} of file {}", b, src);
|
||||||
"BLOCK* NameSystem.abandonBlock: {} of file {}", b, src);
|
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
final FSPermissionChecker pc = getPermissionChecker();
|
final FSPermissionChecker pc = getPermissionChecker();
|
||||||
FSPermissionChecker.setOperationType(null);
|
FSPermissionChecker.setOperationType(null);
|
||||||
@ -3158,8 +3156,8 @@ void abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
|
|||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
|
checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
|
||||||
FSDirWriteFileOp.abandonBlock(dir, pc, b, fileId, src, holder);
|
FSDirWriteFileOp.abandonBlock(dir, pc, b, fileId, src, holder);
|
||||||
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: {} is " +
|
NameNode.stateChangeLog.debug(
|
||||||
"removed from pendingCreates", b);
|
"BLOCK* NameSystem.abandonBlock: {} is removed from pendingCreates", b);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock("abandonBlock");
|
writeUnlock("abandonBlock");
|
||||||
}
|
}
|
||||||
@ -4016,9 +4014,10 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
|
|||||||
if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) &&
|
if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) &&
|
||||||
iFile.getLastBlock().isComplete()) {
|
iFile.getLastBlock().isComplete()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Unexpected block (={}) since the file (={}) is not "
|
LOG.debug("Unexpected block (={}) since the file (={}) is not under construction",
|
||||||
+ "under construction", oldBlock, iFile.getLocalName());
|
oldBlock, iFile.getLocalName());
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4478,8 +4477,8 @@ private void closeFile(String path, INodeFile file) {
|
|||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
// file is closed
|
// file is closed
|
||||||
getEditLog().logCloseFile(path, file);
|
getEditLog().logCloseFile(path, file);
|
||||||
NameNode.stateChangeLog.debug("closeFile: {} with {} blocks is persisted" +
|
NameNode.stateChangeLog.debug("closeFile: {} with {} blocks is persisted to the file system",
|
||||||
" to the file system", path, file.getBlocks().length);
|
path, file.getBlocks().length);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -6107,9 +6106,7 @@ Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
|
|||||||
if (cookieTab[0] == null) {
|
if (cookieTab[0] == null) {
|
||||||
cookieTab[0] = String.valueOf(getIntCookie(cookieTab[0]));
|
cookieTab[0] = String.valueOf(getIntCookie(cookieTab[0]));
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("there are no corrupt file blocks.");
|
LOG.debug("there are no corrupt file blocks.");
|
||||||
}
|
|
||||||
return corruptFiles;
|
return corruptFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -810,10 +810,8 @@ public HdfsFileStatus create(String src, FsPermission masked,
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
String clientMachine = getClientMachine();
|
String clientMachine = getClientMachine();
|
||||||
if (stateChangeLog.isDebugEnabled()) {
|
stateChangeLog.debug("*DIR* NameNode.create: file {} for {} at {}.",
|
||||||
stateChangeLog.debug("*DIR* NameNode.create: file "
|
src, clientName, clientMachine);
|
||||||
+src+" for "+clientName+" at "+clientMachine);
|
|
||||||
}
|
|
||||||
if (!checkPathLength(src)) {
|
if (!checkPathLength(src)) {
|
||||||
throw new IOException("create: Pathname too long. Limit "
|
throw new IOException("create: Pathname too long. Limit "
|
||||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||||
@ -845,10 +843,8 @@ public LastBlockWithStatus append(String src, String clientName,
|
|||||||
EnumSetWritable<CreateFlag> flag) throws IOException {
|
EnumSetWritable<CreateFlag> flag) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
String clientMachine = getClientMachine();
|
String clientMachine = getClientMachine();
|
||||||
if (stateChangeLog.isDebugEnabled()) {
|
stateChangeLog.debug("*DIR* NameNode.append: file {} for {} at {}.",
|
||||||
stateChangeLog.debug("*DIR* NameNode.append: file "
|
src, clientName, clientMachine);
|
||||||
+src+" for "+clientName+" at "+clientMachine);
|
|
||||||
}
|
|
||||||
namesystem.checkOperation(OperationCategory.WRITE);
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
|
CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
@ -894,8 +890,8 @@ public void unsetStoragePolicy(String src)
|
|||||||
public void setStoragePolicy(String src, String policyName)
|
public void setStoragePolicy(String src, String policyName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
stateChangeLog.debug("*DIR* NameNode.setStoragePolicy for path: {}, " +
|
stateChangeLog.debug("*DIR* NameNode.setStoragePolicy for path: {}, policyName: {}",
|
||||||
"policyName: {}", src, policyName);
|
src, policyName);
|
||||||
namesystem.setStoragePolicy(src, policyName);
|
namesystem.setStoragePolicy(src, policyName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -949,13 +945,9 @@ public LocatedBlock getAdditionalDatanode(final String src,
|
|||||||
) throws IOException {
|
) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("getAdditionalDatanode: src=" + src
|
LOG.debug("getAdditionalDatanode: src={}, fileId={}, blk={}, existings={}, excludes={}"
|
||||||
+ ", fileId=" + fileId
|
+ ", numAdditionalNodes={}, clientName={}", src, fileId, blk, Arrays.asList(existings),
|
||||||
+ ", blk=" + blk
|
Arrays.asList(excludes), numAdditionalNodes, clientName);
|
||||||
+ ", existings=" + Arrays.asList(existings)
|
|
||||||
+ ", excludes=" + Arrays.asList(excludes)
|
|
||||||
+ ", numAdditionalNodes=" + numAdditionalNodes
|
|
||||||
+ ", clientName=" + clientName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.incrGetAdditionalDatanodeOps();
|
metrics.incrGetAdditionalDatanodeOps();
|
||||||
@ -1053,9 +1045,7 @@ public long getPreferredBlockSize(String filename)
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public boolean rename(String src, String dst) throws IOException {
|
public boolean rename(String src, String dst) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
if(stateChangeLog.isDebugEnabled()) {
|
stateChangeLog.debug("*DIR* NameNode.rename: {} to {}.", src, dst);
|
||||||
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
|
|
||||||
}
|
|
||||||
if (!checkPathLength(dst)) {
|
if (!checkPathLength(dst)) {
|
||||||
throw new IOException("rename: Pathname too long. Limit "
|
throw new IOException("rename: Pathname too long. Limit "
|
||||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||||
@ -1081,8 +1071,10 @@ public boolean rename(String src, String dst) throws IOException {
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public void concat(String trg, String[] src) throws IOException {
|
public void concat(String trg, String[] src) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
stateChangeLog.debug("*DIR* NameNode.concat: src path {} to" +
|
if (stateChangeLog.isDebugEnabled()) {
|
||||||
" target path {}", Arrays.toString(src), trg);
|
stateChangeLog.debug("*DIR* NameNode.concat: src path {} to target path {}",
|
||||||
|
Arrays.toString(src), trg);
|
||||||
|
}
|
||||||
namesystem.checkOperation(OperationCategory.WRITE);
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = getCacheEntry();
|
CacheEntry cacheEntry = getCacheEntry();
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
@ -1102,9 +1094,7 @@ public void concat(String trg, String[] src) throws IOException {
|
|||||||
public void rename2(String src, String dst, Options.Rename... options)
|
public void rename2(String src, String dst, Options.Rename... options)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
if(stateChangeLog.isDebugEnabled()) {
|
stateChangeLog.debug("*DIR* NameNode.rename: {} to {}.", src, dst);
|
||||||
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
|
|
||||||
}
|
|
||||||
if (!checkPathLength(dst)) {
|
if (!checkPathLength(dst)) {
|
||||||
throw new IOException("rename: Pathname too long. Limit "
|
throw new IOException("rename: Pathname too long. Limit "
|
||||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||||
@ -1128,8 +1118,7 @@ public void rename2(String src, String dst, Options.Rename... options)
|
|||||||
public boolean truncate(String src, long newLength, String clientName)
|
public boolean truncate(String src, long newLength, String clientName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
stateChangeLog
|
stateChangeLog.debug("*DIR* NameNode.truncate: {} to {}", src, newLength);
|
||||||
.debug("*DIR* NameNode.truncate: " + src + " to " + newLength);
|
|
||||||
String clientMachine = getClientMachine();
|
String clientMachine = getClientMachine();
|
||||||
try {
|
try {
|
||||||
return namesystem.truncate(
|
return namesystem.truncate(
|
||||||
@ -1142,10 +1131,7 @@ public boolean truncate(String src, long newLength, String clientName)
|
|||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public boolean delete(String src, boolean recursive) throws IOException {
|
public boolean delete(String src, boolean recursive) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
if (stateChangeLog.isDebugEnabled()) {
|
stateChangeLog.debug("*DIR* Namenode.delete: src={}, recursive={}.", src, recursive);
|
||||||
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
|
|
||||||
+ ", recursive=" + recursive);
|
|
||||||
}
|
|
||||||
namesystem.checkOperation(OperationCategory.WRITE);
|
namesystem.checkOperation(OperationCategory.WRITE);
|
||||||
CacheEntry cacheEntry = getCacheEntry();
|
CacheEntry cacheEntry = getCacheEntry();
|
||||||
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
if (cacheEntry != null && cacheEntry.isSuccess()) {
|
||||||
@ -1178,9 +1164,7 @@ private boolean checkPathLength(String src) {
|
|||||||
public boolean mkdirs(String src, FsPermission masked, boolean createParent)
|
public boolean mkdirs(String src, FsPermission masked, boolean createParent)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
if(stateChangeLog.isDebugEnabled()) {
|
stateChangeLog.debug("*DIR* NameNode.mkdirs: {}.", src);
|
||||||
stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
|
|
||||||
}
|
|
||||||
if (!checkPathLength(src)) {
|
if (!checkPathLength(src)) {
|
||||||
throw new IOException("mkdirs: Pathname too long. Limit "
|
throw new IOException("mkdirs: Pathname too long. Limit "
|
||||||
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
|
||||||
@ -1649,10 +1633,8 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
|
|||||||
final BlockReportContext context) throws IOException {
|
final BlockReportContext context) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
verifyRequest(nodeReg);
|
verifyRequest(nodeReg);
|
||||||
if(blockStateChangeLog.isDebugEnabled()) {
|
blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: from {}, reports.length={}.",
|
||||||
blockStateChangeLog.debug("*BLOCK* NameNode.blockReport: "
|
nodeReg, reports.length);
|
||||||
+ "from " + nodeReg + ", reports.length=" + reports.length);
|
|
||||||
}
|
|
||||||
final BlockManager bm = namesystem.getBlockManager();
|
final BlockManager bm = namesystem.getBlockManager();
|
||||||
boolean noStaleStorages = false;
|
boolean noStaleStorages = false;
|
||||||
try {
|
try {
|
||||||
@ -1695,10 +1677,8 @@ public DatanodeCommand cacheReport(DatanodeRegistration nodeReg,
|
|||||||
String poolId, List<Long> blockIds) throws IOException {
|
String poolId, List<Long> blockIds) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
verifyRequest(nodeReg);
|
verifyRequest(nodeReg);
|
||||||
if (blockStateChangeLog.isDebugEnabled()) {
|
blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: from {} {} blocks",
|
||||||
blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
|
nodeReg, blockIds.size());
|
||||||
+ "from " + nodeReg + " " + blockIds.size() + " blocks");
|
|
||||||
}
|
|
||||||
namesystem.getCacheManager().processCacheReport(nodeReg, blockIds);
|
namesystem.getCacheManager().processCacheReport(nodeReg, blockIds);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -1710,11 +1690,8 @@ public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
|
|||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
verifyRequest(nodeReg);
|
verifyRequest(nodeReg);
|
||||||
metrics.incrBlockReceivedAndDeletedOps();
|
metrics.incrBlockReceivedAndDeletedOps();
|
||||||
if(blockStateChangeLog.isDebugEnabled()) {
|
blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: from {} {} blocks.",
|
||||||
blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
|
nodeReg, receivedAndDeletedBlocks.length);
|
||||||
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
|
|
||||||
+" blocks.");
|
|
||||||
}
|
|
||||||
final BlockManager bm = namesystem.getBlockManager();
|
final BlockManager bm = namesystem.getBlockManager();
|
||||||
for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
|
for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
|
||||||
bm.enqueueBlockOp(new Runnable() {
|
bm.enqueueBlockOp(new Runnable() {
|
||||||
@ -1842,9 +1819,7 @@ public Collection<RefreshResponse> refresh(String identifier, String[] args) {
|
|||||||
|
|
||||||
@Override // GetUserMappingsProtocol
|
@Override // GetUserMappingsProtocol
|
||||||
public String[] getGroupsForUser(String user) throws IOException {
|
public String[] getGroupsForUser(String user) throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Getting groups for user {}", user);
|
||||||
LOG.debug("Getting groups for user " + user);
|
|
||||||
}
|
|
||||||
return UserGroupInformation.createRemoteUser(user).getGroupNames();
|
return UserGroupInformation.createRemoteUser(user).getGroupNames();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2022,9 +1997,9 @@ public void disallowSnapshot(String snapshot) throws IOException {
|
|||||||
public void renameSnapshot(String snapshotRoot, String snapshotOldName,
|
public void renameSnapshot(String snapshotRoot, String snapshotOldName,
|
||||||
String snapshotNewName) throws IOException {
|
String snapshotNewName) throws IOException {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
LOG.debug("*DIR* NameNode.renameSnapshot: Snapshot Path {}, " +
|
LOG.debug(
|
||||||
"snapshotOldName {}, snapshotNewName {}", snapshotRoot,
|
"*DIR* NameNode.renameSnapshot: Snapshot Path {},snapshotOldName {}, snapshotNewName {}",
|
||||||
snapshotOldName, snapshotNewName);
|
snapshotRoot, snapshotOldName, snapshotNewName);
|
||||||
if (snapshotNewName == null || snapshotNewName.isEmpty()) {
|
if (snapshotNewName == null || snapshotNewName.isEmpty()) {
|
||||||
throw new IOException("The new snapshot name is null or empty.");
|
throw new IOException("The new snapshot name is null or empty.");
|
||||||
}
|
}
|
||||||
@ -2318,8 +2293,7 @@ public void setErasureCodingPolicy(String src, String ecPolicyName)
|
|||||||
try {
|
try {
|
||||||
if (ecPolicyName == null) {
|
if (ecPolicyName == null) {
|
||||||
ecPolicyName = defaultECPolicyName;
|
ecPolicyName = defaultECPolicyName;
|
||||||
LOG.debug("No policy name is specified, " +
|
LOG.debug("No policy name is specified, set the default policy name instead");
|
||||||
"set the default policy name instead");
|
|
||||||
}
|
}
|
||||||
LOG.debug("Set erasure coding policy {} on {}", ecPolicyName, src);
|
LOG.debug("Set erasure coding policy {} on {}", ecPolicyName, src);
|
||||||
namesystem.setErasureCodingPolicy(src, ecPolicyName, cacheEntry != null);
|
namesystem.setErasureCodingPolicy(src, ecPolicyName, cacheEntry != null);
|
||||||
|
@ -262,9 +262,7 @@ public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
|||||||
nnCount = nns.size();
|
nnCount = nns.size();
|
||||||
// setup the iterator to endlessly loop the nns
|
// setup the iterator to endlessly loop the nns
|
||||||
this.nnLookup = Iterators.cycle(nns);
|
this.nnLookup = Iterators.cycle(nns);
|
||||||
|
LOG.debug("logRollPeriodMs={} sleepTime={}.", logRollPeriodMs, sleepTimeMs);
|
||||||
LOG.debug("logRollPeriodMs=" + logRollPeriodMs +
|
|
||||||
" sleepTime=" + sleepTimeMs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
@ -360,9 +358,7 @@ public long doTailEdits() throws IOException, InterruptedException {
|
|||||||
currentLastTxnId, lastTxnId);
|
currentLastTxnId, lastTxnId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("edit streams to load from: {}.", streams.size());
|
||||||
LOG.debug("edit streams to load from: " + streams.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Once we have streams to load, errors encountered are legitimate cause
|
// Once we have streams to load, errors encountered are legitimate cause
|
||||||
// for concern, so we don't catch them here. Simple errors reading from
|
// for concern, so we don't catch them here. Simple errors reading from
|
||||||
@ -375,10 +371,7 @@ public long doTailEdits() throws IOException, InterruptedException {
|
|||||||
editsLoaded = elie.getNumEditsLoaded();
|
editsLoaded = elie.getNumEditsLoaded();
|
||||||
throw elie;
|
throw elie;
|
||||||
} finally {
|
} finally {
|
||||||
if (editsLoaded > 0 || LOG.isDebugEnabled()) {
|
LOG.debug("Loaded {} edits starting from txid {}.", editsLoaded, lastTxnId);
|
||||||
LOG.debug(String.format("Loaded %d edits starting from txid %d ",
|
|
||||||
editsLoaded, lastTxnId));
|
|
||||||
}
|
|
||||||
NameNode.getNameNodeMetrics().addNumEditLogLoaded(editsLoaded);
|
NameNode.getNameNodeMetrics().addNumEditLogLoaded(editsLoaded);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user