YARN-11465. Improved YarnClient Log Format (#5550)

Co-authored-by: yl09099 <shaq376260428@163.com>
Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
yl09099 2023-04-17 09:27:52 +08:00 committed by GitHub
parent 0d1b4a3556
commit 2c4d6bf33d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 82 additions and 89 deletions

View File

@ -69,17 +69,16 @@ public void onText(Session session, String message) throws IOException {
@OnWebSocketConnect
public void onConnect(Session s) {
initTerminal(s);
LOG.info(s.getRemoteAddress().getHostString() + " connected!");
LOG.info("{} connected!", s.getRemoteAddress().getHostString());
}
@OnWebSocketClose
public void onClose(Session session, int status, String reason) {
if (status==1000) {
LOG.info(session.getRemoteAddress().getHostString() +
" closed, status: " + status);
LOG.info("{} closed, status: {}", session.getRemoteAddress().getHostString(), status);
} else {
LOG.warn(session.getRemoteAddress().getHostString() +
" closed, status: " + status + " Reason: " + reason);
LOG.warn("{} closed, status:" +
" {} Reason: {}.", session.getRemoteAddress().getHostString(), status, reason);
}
}

View File

@ -133,7 +133,7 @@ protected void serviceInit(Configuration conf) throws Exception {
this.maxThreadPoolSize = conf.getInt(
YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
LOG.info("Upper bound of the thread pool size is {}.", maxThreadPoolSize);
client.init(conf);
super.serviceInit(conf);
@ -186,9 +186,8 @@ public void run() {
// always increasing the pool-size
int newThreadPoolSize = Math.min(maxThreadPoolSize,
idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
LOG.info("Set NMClientAsync thread pool size to " +
newThreadPoolSize + " as the number of nodes to talk to is "
+ nodeNum);
LOG.info("Set NMClientAsync thread pool size to {} " +
"as the number of nodes to talk to is {}.", newThreadPoolSize, nodeNum);
threadPool.setCorePoolSize(newThreadPoolSize);
}
}
@ -252,8 +251,7 @@ public void startContainerAsync(
try {
events.put(new StartContainerEvent(container, containerLaunchContext));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of starting Container " +
container.getId());
LOG.warn("Exception when scheduling the event of starting Container {}", container.getId());
callbackHandler.onStartContainerError(container.getId(), e);
}
}
@ -276,8 +274,8 @@ public void increaseContainerResourceAsync(Container container) {
try {
events.put(new UpdateContainerResourceEvent(container, true));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of increasing resource of "
+ "Container " + container.getId());
LOG.warn("Exception when scheduling the event of increasing " +
"resource of Container {}", container.getId());
handler.onIncreaseContainerResourceError(container.getId(), e);
}
}
@ -300,8 +298,8 @@ public void updateContainerResourceAsync(Container container) {
try {
events.put(new UpdateContainerResourceEvent(container, false));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of increasing resource of "
+ "Container " + container.getId());
LOG.warn("Exception when scheduling the event of " +
"increasing resource of Container {}.", container.getId());
handler.onUpdateContainerResourceError(container.getId(), e);
}
}
@ -325,8 +323,8 @@ public void reInitializeContainerAsync(ContainerId containerId,
client.getNodeIdOfStartedContainer(containerId),
containerLaunchContex, autoCommit));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of re-initializing of "
+ "Container " + containerId);
LOG.warn("Exception when scheduling the event of " +
"re-initializing of Container {}", containerId);
handler.onContainerReInitializeError(containerId, e);
}
}
@ -349,8 +347,7 @@ public void restartContainerAsync(ContainerId containerId){
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.RESTART_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of restart of "
+ "Container " + containerId);
LOG.warn("Exception when scheduling the event of restart of Container {}", containerId);
handler.onContainerRestartError(containerId, e);
}
}
@ -373,8 +370,8 @@ public void rollbackLastReInitializationAsync(ContainerId containerId){
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.ROLLBACK_LAST_REINIT));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event Rollback re-initialization"
+ " of Container " + containerId);
LOG.warn("Exception when scheduling the event Rollback " +
"re-initialization of Container {}", containerId);
handler.onRollbackLastReInitializationError(containerId, e);
}
}
@ -397,8 +394,8 @@ public void commitLastReInitializationAsync(ContainerId containerId){
client.getNodeIdOfStartedContainer(containerId),
null, ContainerEventType.COMMIT_LAST_REINT));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event Commit re-initialization"
+ " of Container " + containerId);
LOG.warn("Exception when scheduling the event " +
"Commit re-initialization of Container {}", containerId);
handler.onCommitLastReInitializationError(containerId, e);
}
}
@ -413,8 +410,7 @@ public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
events.put(new ContainerEvent(containerId, nodeId, null,
ContainerEventType.STOP_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of stopping Container " +
containerId);
LOG.warn("Exception when scheduling the event of stopping Container {}", containerId);
callbackHandler.onStopContainerError(containerId, e);
}
}
@ -424,8 +420,8 @@ public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
events.put(new ContainerEvent(containerId, nodeId, null,
ContainerEventType.QUERY_CONTAINER));
} catch (InterruptedException e) {
LOG.warn("Exception when scheduling the event of querying the status" +
" of Container " + containerId);
LOG.warn("Exception when scheduling the event of querying " +
"the status of Container {}", containerId);
callbackHandler.onGetContainerStatusError(containerId, e);
}
}
@ -730,7 +726,7 @@ public ContainerState transition(StatefulContainer container,
switch(containerEvent.getType()) {
case REINITIALIZE_CONTAINER:
if (!(containerEvent instanceof ReInitializeContainerEvevnt)) {
LOG.error("Unexpected Event.. [" +containerEvent.getType() + "]");
LOG.error("Unexpected Event.. [{}]", containerEvent.getType());
return ContainerState.FAILED;
}
ReInitializeContainerEvevnt rEvent =
@ -771,8 +767,8 @@ public ContainerState transition(StatefulContainer container,
}
break;
default:
LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
" expected here..");
LOG.warn("Event of type [{}] not" +
" expected here..", containerEvent.getType());
break;
}
if (handlerError != null) {
@ -942,7 +938,7 @@ public ContainerEventProcessor(ContainerEvent event) {
@Override
public void run() {
ContainerId containerId = event.getContainerId();
LOG.info("Processing Event " + event + " for Container " + containerId);
LOG.info("Processing Event {} for Container {}", event, containerId);
if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
try {
ContainerStatus containerStatus = client.getContainerStatus(
@ -962,7 +958,7 @@ public void run() {
} else {
StatefulContainer container = containers.get(containerId);
if (container == null) {
LOG.info("Container " + containerId + " is already stopped or failed");
LOG.info("Container {} is already stopped or failed", containerId);
} else {
container.handle(event);
if (isCompletelyDone(container)) {

View File

@ -478,11 +478,12 @@ protected void removePendingChangeRequests(
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("RM has confirmed changed resource allocation for "
+ "container " + containerId + ". Current resource allocation:"
+ changedContainer.getContainer().getResource()
+ ". Remove pending change request:"
+ pendingChange.get(containerId).getValue());
LOG.debug("RM has confirmed changed resource allocation for container {}. " +
"Current resource allocation:{}. " +
"Remove pending change request:{}",
containerId,
changedContainer.getContainer().getResource(),
pendingChange.get(containerId).getValue());
}
pendingChange.remove(containerId);
}
@ -495,9 +496,9 @@ protected void populateNMTokens(List<NMToken> nmTokens) {
String nodeId = token.getNodeId().toString();
if (LOG.isDebugEnabled()) {
if (getNMTokenCache().containsToken(nodeId)) {
LOG.debug("Replacing token for : " + nodeId);
LOG.debug("Replacing token for : {}", nodeId);
} else {
LOG.debug("Received new token for : " + nodeId);
LOG.debug("Received new token for : {}", nodeId);
}
}
getNMTokenCache().setToken(nodeId, token.getToken());
@ -544,8 +545,7 @@ public synchronized void addContainerRequest(T req) {
dedupedRacks.addAll(req.getRacks());
if(req.getRacks().size() != dedupedRacks.size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate racks: "
+ joiner.join(req.getRacks()));
LOG.warn("ContainerRequest has duplicate racks: {}", joiner.join(req.getRacks()));
}
}
Set<String> inferredRacks = resolveRacks(req.getNodes());
@ -573,8 +573,7 @@ public synchronized void addContainerRequest(T req) {
HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
if(dedupedNodes.size() != req.getNodes().size()) {
Joiner joiner = Joiner.on(',');
LOG.warn("ContainerRequest has duplicate nodes: "
+ joiner.join(req.getNodes()));
LOG.warn("ContainerRequest has duplicate nodes: {}", joiner.join(req.getNodes()));
}
for (String node : dedupedNodes) {
addResourceRequest(req.getPriority(), node,
@ -636,11 +635,12 @@ public synchronized void requestContainerUpdate(
Preconditions.checkNotNull(container, "Container cannot be null!!");
Preconditions.checkNotNull(updateContainerRequest,
"UpdateContainerRequest cannot be null!!");
LOG.info("Requesting Container update : " +
"container=" + container + ", " +
"updateType=" + updateContainerRequest.getContainerUpdateType() + ", " +
"targetCapability=" + updateContainerRequest.getCapability() + ", " +
"targetExecType=" + updateContainerRequest.getExecutionType());
LOG.info("Requesting Container update : container={}, updateType={}," +
" targetCapability={}, targetExecType={}",
container,
updateContainerRequest.getContainerUpdateType(),
updateContainerRequest.getCapability(),
updateContainerRequest.getExecutionType());
if (updateContainerRequest.getCapability() != null &&
updateContainerRequest.getExecutionType() == null) {
validateContainerResourceChangeRequest(
@ -770,7 +770,7 @@ private Set<String> resolveRacks(List<String> nodes) {
// Ensure node requests are accompanied by requests for
// corresponding rack
if (rack == null) {
LOG.warn("Failed to resolve rack for node " + node + ".");
LOG.warn("Failed to resolve rack for node {}.", node);
} else {
racks.add(rack);
}
@ -941,12 +941,13 @@ private void addResourceRequest(Priority priority, String resourceName,
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
LOG.debug("addResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
LOG.debug("Adding request to ask {}", resourceRequestInfo.remoteRequest);
LOG.debug("addResourceRequest: applicationId= priority={}" +
" resourceName={} numContainers={} #asks={}",
priority.getPriority(),
resourceName,
resourceRequestInfo.remoteRequest.getNumContainers(),
ask.size());
}
}
@ -972,17 +973,16 @@ private void decResourceRequest(Priority priority, String resourceName,
}
if (LOG.isDebugEnabled()) {
LOG.debug("AFTER decResourceRequest:"
+ " allocationRequestId=" + req.getAllocationRequestId()
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
LOG.debug("AFTER decResourceRequest: allocationRequestId={} " +
"priority={} resourceName={} numContainers={} #asks={}",
req.getAllocationRequestId(), priority.getPriority(),
resourceName,
resourceRequestInfo.remoteRequest.getNumContainers(), ask.size());
}
}
} else {
LOG.info("No remoteRequestTable found with allocationRequestId="
+ req.getAllocationRequestId());
LOG.info("No remoteRequestTable found with allocationRequestId={}",
req.getAllocationRequestId());
}
}

View File

@ -128,13 +128,11 @@ protected synchronized void cleanupRunningContainers() {
stopContainer(startedContainer.getContainerId(),
startedContainer.getNodeId());
} catch (YarnException e) {
LOG.error("Failed to stop Container " +
startedContainer.getContainerId() +
" when stopping NMClientImpl");
LOG.error("Failed to stop Container {} when stopping NMClientImpl",
startedContainer.getContainerId());
} catch (IOException e) {
LOG.error("Failed to stop Container " +
startedContainer.getContainerId() +
" when stopping NMClientImpl");
LOG.error("Failed to stop Container {} when stopping NMClientImpl",
startedContainer.getContainerId());
}
}
}

View File

@ -353,7 +353,7 @@ public YarnClientApplication createApplication()
throw new YarnException("Failed to submit " + applicationId +
" to YARN : " + appReport.getDiagnostics());
}
LOG.info("Submitted application " + applicationId);
LOG.info("Submitted application {}", applicationId);
break;
}
@ -368,8 +368,9 @@ public YarnClientApplication createApplication()
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
"submitted application {} is still in {}",
applicationId,
state);
}
try {
Thread.sleep(submitPollIntervalMillis);
@ -382,8 +383,8 @@ public YarnClientApplication createApplication()
} catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves
// ApplicationState
LOG.info("Re-submit application " + applicationId + "with the " +
"same ApplicationSubmissionContext");
LOG.info("Re-submit application {} with the" +
" same ApplicationSubmissionContext", applicationId);
rmClient.submitApplication(request);
}
}
@ -408,7 +409,7 @@ private void addLogAggregationDelegationToken(
throw new IOException(
"Can't get Master Kerberos principal for use as renewer");
}
LOG.debug("Delegation Token Renewer: " + masterPrincipal);
LOG.debug("Delegation Token Renewer: {}", masterPrincipal);
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(conf);
@ -421,8 +422,7 @@ private void addLogAggregationDelegationToken(
fs.addDelegationTokens(masterPrincipal, credentials);
if (finalTokens != null) {
for (org.apache.hadoop.security.token.Token<?> token : finalTokens) {
LOG.info("Added delegation token for log aggregation path "
+ remoteRootLogDir + "; "+token);
LOG.info("Added delegation token for log aggregation path {}; {}", remoteRootLogDir, token);
}
}
@ -485,8 +485,7 @@ private void addTimelineDelegationToken(
return timelineClient.getDelegationToken(timelineDTRenewer);
} catch (Exception e) {
if (timelineServiceBestEffort) {
LOG.warn("Failed to get delegation token from the timeline server: "
+ e.getMessage());
LOG.warn("Failed to get delegation token from the timeline server: {}", e.getMessage());
return null;
}
throw new IOException(e);
@ -527,7 +526,7 @@ protected boolean isSecurityEnabled() {
@Override
public void failApplicationAttempt(ApplicationAttemptId attemptId)
throws YarnException, IOException {
LOG.info("Failing application attempt " + attemptId);
LOG.info("Failing application attempt {}.", attemptId);
FailApplicationAttemptRequest request =
Records.newRecord(FailApplicationAttemptRequest.class);
request.setApplicationAttemptId(attemptId);
@ -560,7 +559,7 @@ public void killApplication(ApplicationId applicationId, String diagnostics)
KillApplicationResponse response =
rmClient.forceKillApplication(request);
if (response.getIsKillCompleted()) {
LOG.info("Killed application " + applicationId);
LOG.info("Killed application {}", applicationId);
break;
}
@ -573,7 +572,7 @@ public void killApplication(ApplicationId applicationId, String diagnostics)
if (++pollCount % 10 == 0) {
LOG.info(
"Waiting for application " + applicationId + " to be killed.");
"Waiting for application {} to be killed.", applicationId);
}
Thread.sleep(asyncApiPollIntervalMillis);
}
@ -1080,7 +1079,7 @@ public Priority updateApplicationPriority(ApplicationId applicationId,
public void signalToContainer(ContainerId containerId,
SignalContainerCommand command)
throws YarnException, IOException {
LOG.info("Signalling container " + containerId + " with command " + command);
LOG.info("Signalling container {} with command {}", containerId, command);
SignalContainerRequest request =
SignalContainerRequest.newInstance(containerId, command);
rmClient.signalToContainer(request);
@ -1186,9 +1185,9 @@ public void shellToContainer(ContainerId containerId,
client.stop();
}
} catch (WebSocketException e) {
LOG.debug("Websocket exception: " + e.getMessage());
LOG.debug("Websocket exception: {}", e.getMessage());
} catch (Throwable t) {
LOG.error("Fail to shell to container: " + t.getMessage());
LOG.error("Fail to shell to container: {}", t.getMessage());
}
}
}

View File

@ -79,7 +79,7 @@ public void run() {
boolean rmStarted = rmStartedSignal.await(60000L, TimeUnit.MILLISECONDS);
Assert.assertTrue("ResourceManager failed to start up.", rmStarted);
LOG.info("ResourceManager RMAdmin address: " +
LOG.info("ResourceManager RMAdmin address: {}.",
conf.get(YarnConfiguration.RM_ADMIN_ADDRESS));
}

View File

@ -189,6 +189,7 @@ public void testFailoverChange() throws Exception {
* and {@link AutoRefreshRMFailoverProxyProvider#performFailover(Object)}
* gets called.
*/
@SuppressWarnings("unchecked")
@Test
public void testAutoRefreshFailoverChange() throws Exception {
conf.setClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER,

View File

@ -106,8 +106,8 @@ public void run() {
boolean rmStarted = rmStartedSignal.await(60000L, TimeUnit.MILLISECONDS);
Assert.assertTrue("ResourceManager failed to start up.", rmStarted);
LOG.info("ResourceManager RMAdmin address: "
+ configuration.get(YarnConfiguration.RM_ADMIN_ADDRESS));
LOG.info("ResourceManager RMAdmin address: {}.",
configuration.get(YarnConfiguration.RM_ADMIN_ADDRESS));
client = new ResourceManagerAdministrationProtocolPBClientImpl(1L,
getProtocolAddress(configuration), configuration);

View File

@ -76,7 +76,7 @@ public static void afterClass() {
localFs.close();
}
} catch (IOException ioe) {
LOG.info("IO exception in closing file system)");
LOG.info("IO exception in closing file system");
ioe.printStackTrace();
}
}