HDFS-17018. Improve dfsclient log format. (#5668). Contributed by Xianming Lei.
Reviewed-by: Shilun Fan <slfan1989@apache.org> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
4627242c44
commit
441fb23293
@ -349,9 +349,8 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
|
||||
|
||||
if (numResponseToDrop > 0) {
|
||||
// This case is used for testing.
|
||||
LOG.warn(DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
|
||||
+ " is set to " + numResponseToDrop
|
||||
+ ", this hacked client will proactively drop responses");
|
||||
LOG.warn("{} is set to {} , this hacked client will proactively drop responses",
|
||||
DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY, numResponseToDrop);
|
||||
proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,
|
||||
nameNodeUri, ClientProtocol.class, numResponseToDrop,
|
||||
nnFallbackToSimpleAuth);
|
||||
@ -378,9 +377,9 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
|
||||
conf.getTrimmedStrings(DFS_CLIENT_LOCAL_INTERFACES);
|
||||
localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
|
||||
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
|
||||
LOG.debug("Using local interfaces [" +
|
||||
Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
|
||||
Joiner.on(',').join(localInterfaceAddrs) + "]");
|
||||
LOG.debug("Using local interfaces [{}] with addresses [{}]",
|
||||
Joiner.on(',').join(localInterfaces),
|
||||
Joiner.on(',').join(localInterfaceAddrs));
|
||||
}
|
||||
|
||||
Boolean readDropBehind =
|
||||
@ -623,10 +622,9 @@ public boolean renewLease() throws IOException {
|
||||
// Abort if the lease has already expired.
|
||||
final long elapsed = Time.monotonicNow() - getLastLeaseRenewal();
|
||||
if (elapsed > dfsClientConf.getleaseHardLimitPeriod()) {
|
||||
LOG.warn("Failed to renew lease for " + clientName + " for "
|
||||
+ (elapsed/1000) + " seconds (>= hard-limit ="
|
||||
+ (dfsClientConf.getleaseHardLimitPeriod() / 1000) + " seconds.) "
|
||||
+ "Closing all files being written ...", e);
|
||||
LOG.warn("Failed to renew lease for {} for {} seconds (>= hard-limit ={} seconds.) "
|
||||
+ "Closing all files being written ...", clientName, (elapsed/1000),
|
||||
(dfsClientConf.getleaseHardLimitPeriod() / 1000), e);
|
||||
closeAllFilesBeingWritten(true);
|
||||
} else {
|
||||
// Let the lease renewer handle it and retry.
|
||||
@ -664,8 +662,8 @@ public void closeAllFilesBeingWritten(final boolean abort) {
|
||||
out.close();
|
||||
}
|
||||
} catch(IOException ie) {
|
||||
LOG.error("Failed to " + (abort ? "abort" : "close") + " file: "
|
||||
+ out.getSrc() + " with renewLeaseKey: " + key, ie);
|
||||
LOG.error("Failed to {} file: {} with renewLeaseKey: {}",
|
||||
(abort ? "abort" : "close"), out.getSrc(), key, ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -757,9 +755,9 @@ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
|
||||
namenode.getDelegationToken(renewer);
|
||||
if (token != null) {
|
||||
token.setService(this.dtService);
|
||||
LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
|
||||
LOG.info("Created {}", DelegationTokenIdentifier.stringifyToken(token));
|
||||
} else {
|
||||
LOG.info("Cannot get delegation token from " + renewer);
|
||||
LOG.info("Cannot get delegation token from {}", renewer);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
@ -775,7 +773,7 @@ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
|
||||
@Deprecated
|
||||
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
throws IOException {
|
||||
LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
|
||||
LOG.info("Renewing {}", DelegationTokenIdentifier.stringifyToken(token));
|
||||
try {
|
||||
return token.renew(conf);
|
||||
} catch (InterruptedException ie) {
|
||||
@ -795,7 +793,7 @@ public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
@Deprecated
|
||||
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
throws IOException {
|
||||
LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
|
||||
LOG.info("Cancelling {}", DelegationTokenIdentifier.stringifyToken(token));
|
||||
try {
|
||||
token.cancel(conf);
|
||||
} catch (InterruptedException ie) {
|
||||
@ -839,8 +837,7 @@ public long renew(Token<?> token, Configuration conf) throws IOException {
|
||||
public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||
Token<DelegationTokenIdentifier> delToken =
|
||||
(Token<DelegationTokenIdentifier>) token;
|
||||
LOG.info("Cancelling " +
|
||||
DelegationTokenIdentifier.stringifyToken(delToken));
|
||||
LOG.info("Cancelling {}", DelegationTokenIdentifier.stringifyToken(delToken));
|
||||
ClientProtocol nn = getNNProxy(delToken, conf);
|
||||
try {
|
||||
nn.cancelDelegationToken(delToken);
|
||||
@ -2709,8 +2706,8 @@ void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
|
||||
try {
|
||||
reportBadBlocks(lblocks);
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Found corruption while reading " + file
|
||||
+ ". Error repairing corrupt blocks. Bad blocks remain.", ie);
|
||||
LOG.info("Found corruption while reading {}"
|
||||
+ ". Error repairing corrupt blocks. Bad blocks remain.", file, ie);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -400,10 +400,10 @@ private Set<StripedDataStreamer> checkStreamers() throws IOException {
|
||||
|
||||
final int failCount = failedStreamers.size() + newFailed.size();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("checkStreamers: " + streamers);
|
||||
LOG.debug("healthy streamer count=" + (numAllBlocks - failCount));
|
||||
LOG.debug("original failed streamers: " + failedStreamers);
|
||||
LOG.debug("newly failed streamers: " + newFailed);
|
||||
LOG.debug("checkStreamers: {}", streamers);
|
||||
LOG.debug("healthy streamer count={}", (numAllBlocks - failCount));
|
||||
LOG.debug("original failed streamers: {}", failedStreamers);
|
||||
LOG.debug("newly failed streamers: {}", newFailed);
|
||||
}
|
||||
if (failCount > (numAllBlocks - numDataBlocks)) {
|
||||
closeAllStreamers();
|
||||
|
@ -595,7 +595,7 @@ private static String checkRpcAuxiliary(Configuration conf, String suffix,
|
||||
if (ports == null || ports.length == 0) {
|
||||
return address;
|
||||
}
|
||||
LOG.info("Using server auxiliary ports " + Arrays.toString(ports));
|
||||
LOG.info("Using server auxiliary ports {}", Arrays.toString(ports));
|
||||
URI uri;
|
||||
try {
|
||||
uri = new URI(address);
|
||||
@ -604,7 +604,7 @@ private static String checkRpcAuxiliary(Configuration conf, String suffix,
|
||||
// happens in unit test, as MiniDFSCluster sets the value to
|
||||
// 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this
|
||||
// should not be the case. So log a warning message here.
|
||||
LOG.warn("NameNode address is not a valid uri:" + address);
|
||||
LOG.warn("NameNode address is not a valid uri:{}", address);
|
||||
return address;
|
||||
}
|
||||
// Ignore the port, only take the schema(e.g. hdfs) and host (e.g.
|
||||
@ -1056,8 +1056,8 @@ public Thread newThread(Runnable r) {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable runnable,
|
||||
ThreadPoolExecutor e) {
|
||||
LOG.info(threadNamePrefix + " task is rejected by " +
|
||||
"ThreadPoolExecutor. Executing it in current thread.");
|
||||
LOG.info("{} task is rejected by " +
|
||||
"ThreadPoolExecutor. Executing it in current thread.", threadNamePrefix);
|
||||
// will run in the current thread
|
||||
super.rejectedExecution(runnable, e);
|
||||
}
|
||||
|
@ -67,12 +67,12 @@ public static boolean isHealthy(URI uri) {
|
||||
(DistributedFileSystem) FileSystem.get(uri, conf)) {
|
||||
final boolean safemode = fs.setSafeMode(SafeModeAction.SAFEMODE_GET);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Is namenode in safemode? " + safemode + "; uri=" + uri);
|
||||
LOG.debug("Is namenode in safemode? {}; uri={}", safemode, uri);
|
||||
}
|
||||
return !safemode;
|
||||
} catch (IOException e) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got an exception for uri=" + uri, e);
|
||||
LOG.debug("Got an exception for uri={}", uri, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -412,7 +412,7 @@ private BlockReader tryToCreateExternalBlockReader() {
|
||||
return new ExternalBlockReader(accessor, visibleLength, startOffset);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to construct new object of type " +
|
||||
LOG.warn("Failed to construct new object of type {}",
|
||||
cls.getName(), t);
|
||||
}
|
||||
}
|
||||
@ -458,8 +458,8 @@ private BlockReader getLegacyBlockReaderLocal() throws IOException {
|
||||
// which requires us to disable legacy SCR.
|
||||
throw ioe;
|
||||
}
|
||||
LOG.warn(this + ": error creating legacy BlockReaderLocal. " +
|
||||
"Disabling legacy local reads.", ioe);
|
||||
LOG.warn("{}: error creating legacy BlockReaderLocal. " +
|
||||
"Disabling legacy local reads.", this, ioe);
|
||||
clientContext.setDisableLegacyBlockReaderLocal();
|
||||
return null;
|
||||
}
|
||||
@ -558,8 +558,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
|
||||
// Handle an I/O error we got when using a newly created socket.
|
||||
// We temporarily disable the domain socket path for a few minutes in
|
||||
// this case, to prevent wasting more time on it.
|
||||
LOG.warn(this + ": I/O error requesting file descriptors. " +
|
||||
"Disabling domain socket " + peer.getDomainSocket(), e);
|
||||
LOG.warn("{}: I/O error requesting file descriptors. " +
|
||||
"Disabling domain socket {}", this, peer.getDomainSocket(), e);
|
||||
IOUtilsClient.cleanupWithLogger(LOG, peer);
|
||||
clientContext.getDomainSocketFactory()
|
||||
.disableDomainSocketPath(pathInfo.getPath());
|
||||
@ -621,7 +621,7 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
|
||||
// This indicates an error reading from disk, or a format error. Since
|
||||
// it's not a socket communication problem, we return null rather than
|
||||
// throwing an exception.
|
||||
LOG.warn(this + ": error creating ShortCircuitReplica.", e);
|
||||
LOG.warn("{}: error creating ShortCircuitReplica.", this, e);
|
||||
return null;
|
||||
} finally {
|
||||
if (replica == null) {
|
||||
@ -631,13 +631,13 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
|
||||
case ERROR_UNSUPPORTED:
|
||||
if (!resp.hasShortCircuitAccessVersion()) {
|
||||
LOG.warn("short-circuit read access is disabled for " +
|
||||
"DataNode " + datanode + ". reason: " + resp.getMessage());
|
||||
"DataNode {}. reason: {}", datanode, resp.getMessage());
|
||||
clientContext.getDomainSocketFactory()
|
||||
.disableShortCircuitForPath(pathInfo.getPath());
|
||||
} else {
|
||||
LOG.warn("short-circuit read access for the file " +
|
||||
fileName + " is disabled for DataNode " + datanode +
|
||||
". reason: " + resp.getMessage());
|
||||
"{} is disabled for DataNode {}" +
|
||||
". reason: {}", fileName, datanode, resp.getMessage());
|
||||
}
|
||||
return null;
|
||||
case ERROR_ACCESS_TOKEN:
|
||||
@ -716,7 +716,7 @@ private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
|
||||
// We temporarily disable the domain socket path for a few minutes in
|
||||
// this case, to prevent wasting more time on it.
|
||||
LOG.warn("I/O error constructing remote block reader. Disabling " +
|
||||
"domain socket " + peer.getDomainSocket(), ioe);
|
||||
"domain socket {}", peer.getDomainSocket(), ioe);
|
||||
clientContext.getDomainSocketFactory()
|
||||
.disableDomainSocketPath(pathInfo.getPath());
|
||||
return null;
|
||||
|
@ -244,9 +244,9 @@ static BlockReaderLocalLegacy newBlockReader(DfsClientConf conf,
|
||||
} catch (IOException e) {
|
||||
// remove from cache
|
||||
localDatanodeInfo.removeBlockLocalPathInfo(blk);
|
||||
LOG.warn("BlockReaderLocalLegacy: Removing " + blk
|
||||
+ " from cache because local file " + pathinfo.getBlockPath()
|
||||
+ " could not be opened.");
|
||||
LOG.warn("BlockReaderLocalLegacy: Removing {}"
|
||||
+ " from cache because local file {}"
|
||||
+ " could not be opened.", blk, pathinfo.getBlockPath());
|
||||
throw e;
|
||||
} finally {
|
||||
if (localBlockReader == null) {
|
||||
|
@ -356,7 +356,7 @@ private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
|
||||
classLoader.loadClass(className);
|
||||
classes.add(cls);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Unable to load " + className, t);
|
||||
LOG.warn("Unable to load {}", className, t);
|
||||
}
|
||||
}
|
||||
return classes;
|
||||
|
@ -330,8 +330,8 @@ public synchronized boolean put(final DFSClient dfsc) {
|
||||
public void run() {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Lease renewer daemon for " + clientsString()
|
||||
+ " with renew id " + id + " started");
|
||||
LOG.debug("Lease renewer daemon for {} with renew id {} started",
|
||||
clientsString(), id);
|
||||
}
|
||||
LeaseRenewer.this.run(id);
|
||||
} catch(InterruptedException e) {
|
||||
@ -341,8 +341,8 @@ public void run() {
|
||||
Factory.INSTANCE.remove(LeaseRenewer.this);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Lease renewer daemon for " + clientsString()
|
||||
+ " with renew id " + id + " exited");
|
||||
LOG.debug("Lease renewer daemon for {} with renew id {} exited",
|
||||
clientsString(), id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -444,13 +444,13 @@ private void run(final int id) throws InterruptedException {
|
||||
try {
|
||||
renew();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Lease renewer daemon for " + clientsString()
|
||||
+ " with renew id " + id + " executed");
|
||||
LOG.debug("Lease renewer daemon for {} with renew id {} executed",
|
||||
clientsString(), id);
|
||||
}
|
||||
lastRenewed = Time.monotonicNow();
|
||||
} catch (SocketTimeoutException ie) {
|
||||
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
||||
+ (elapsed/1000) + " seconds. Aborting ...", ie);
|
||||
LOG.warn("Failed to renew lease for {} for {} seconds. Aborting ...",
|
||||
clientsString(), (elapsed/1000), ie);
|
||||
List<DFSClient> dfsclientsCopy;
|
||||
synchronized (this) {
|
||||
DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
|
||||
@ -462,8 +462,8 @@ private void run(final int id) throws InterruptedException {
|
||||
}
|
||||
break;
|
||||
} catch (IOException ie) {
|
||||
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
||||
+ (elapsed/1000) + " seconds. Will retry shortly ...", ie);
|
||||
LOG.warn("Failed to renew lease for {} for {} seconds. Will retry shortly ...",
|
||||
clientsString(), (elapsed/1000), ie);
|
||||
}
|
||||
}
|
||||
|
||||
@ -471,11 +471,11 @@ private void run(final int id) throws InterruptedException {
|
||||
if (id != currentId || isRenewerExpired()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (id != currentId) {
|
||||
LOG.debug("Lease renewer daemon for " + clientsString()
|
||||
+ " with renew id " + id + " is not current");
|
||||
LOG.debug("Lease renewer daemon for {} with renew id {} is not current",
|
||||
clientsString(), id);
|
||||
} else {
|
||||
LOG.debug("Lease renewer daemon for " + clientsString()
|
||||
+ " with renew id " + id + " expired");
|
||||
LOG.debug("Lease renewer daemon for {} with renew id {} expired",
|
||||
clientsString(), id);
|
||||
}
|
||||
}
|
||||
//no longer the current daemon or expired
|
||||
|
Loading…
Reference in New Issue
Block a user