HDFS-16086. Add volume information to datanode log for tracing (#3136)
This commit is contained in:
parent
7c999e2d9a
commit
56c7ada7a5
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.thirdparty.protobuf.ByteString;
|
||||
@ -523,6 +524,7 @@ public void remove() {
|
||||
@InterfaceAudience.Private
|
||||
public static class BlockReportReplica extends Block implements Replica {
|
||||
private ReplicaState state;
|
||||
|
||||
private BlockReportReplica() {
|
||||
}
|
||||
public BlockReportReplica(Block block) {
|
||||
@ -557,6 +559,10 @@ public boolean isOnTransientStorage() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
@Override
|
||||
public FsVolumeSpi getVolume() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return super.equals(o);
|
||||
}
|
||||
|
@ -1551,11 +1551,12 @@ private void finalizeBlock(long startTime) throws IOException {
|
||||
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
|
||||
.getBlockPoolId());
|
||||
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
|
||||
myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
|
||||
dnR.getDatanodeUuid(), block, endTime - startTime));
|
||||
myAddr, replicaInfo.getVolume(), block.getNumBytes(),
|
||||
"HDFS_WRITE", clientname, offset, dnR.getDatanodeUuid(),
|
||||
block, endTime - startTime));
|
||||
} else {
|
||||
LOG.info("Received " + block + " size " + block.getNumBytes()
|
||||
+ " from " + inAddr);
|
||||
LOG.info("Received " + block + " on volume " + replicaInfo.getVolume()
|
||||
+ " size " + block.getNumBytes() + " from " + inAddr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -268,6 +268,7 @@ public class DataNode extends ReconfigurableBase
|
||||
public static final String DN_CLIENTTRACE_FORMAT =
|
||||
"src: %s" + // src IP
|
||||
", dest: %s" + // dst IP
|
||||
", volume: %s" + // volume
|
||||
", bytes: %s" + // byte count
|
||||
", op: %s" + // operation
|
||||
", cliID: %s" + // DFSClient id
|
||||
|
@ -587,7 +587,7 @@ public void readBlock(final ExtendedBlock block,
|
||||
final String clientTraceFmt =
|
||||
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
|
||||
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
|
||||
"%d", "HDFS_READ", clientName, "%d",
|
||||
"", "%d", "HDFS_READ", clientName, "%d",
|
||||
dnR.getDatanodeUuid(), block, "%d")
|
||||
: dnR + " Served block " + block + " to " +
|
||||
remoteAddress;
|
||||
@ -929,8 +929,9 @@ public void writeBlock(final ExtendedBlock block,
|
||||
if (isDatanode ||
|
||||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
|
||||
datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
|
||||
LOG.info("Received {} src: {} dest: {} of size {}",
|
||||
block, remoteAddress, localAddress, block.getNumBytes());
|
||||
LOG.info("Received {} src: {} dest: {} volume: {} of size {}",
|
||||
block, remoteAddress, localAddress, replica.getVolume(),
|
||||
block.getNumBytes());
|
||||
}
|
||||
|
||||
if(isClient) {
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
/**
|
||||
* This represents block replicas which are stored in DataNode.
|
||||
@ -64,4 +65,10 @@ public interface Replica {
|
||||
* Return true if the target volume is backed by RAM.
|
||||
*/
|
||||
public boolean isOnTransientStorage();
|
||||
|
||||
/**
|
||||
* Get the volume of replica.
|
||||
* @return the volume of replica
|
||||
*/
|
||||
public FsVolumeSpi getVolume();
|
||||
}
|
||||
|
@ -232,7 +232,8 @@ public void run() {
|
||||
void deleteAsync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete,
|
||||
ExtendedBlock block, String trashDirectory) {
|
||||
LOG.info("Scheduling " + block.getLocalBlock()
|
||||
+ " replica " + replicaToDelete + " for deletion");
|
||||
+ " replica " + replicaToDelete + " on volume " +
|
||||
replicaToDelete.getVolume() + " for deletion");
|
||||
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
|
||||
volumeRef, replicaToDelete, block, trashDirectory);
|
||||
execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask);
|
||||
|
@ -1129,6 +1129,8 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
|
||||
}
|
||||
}
|
||||
try {
|
||||
LOG.debug("moving block {} from {} to {}", block,
|
||||
replicaInfo.getVolume(), volumeRef.getVolume());
|
||||
moveBlock(block, replicaInfo, volumeRef, useVolumeOnSameMount);
|
||||
datanode.getMetrics().incrReplaceBlockOpOnSameHost();
|
||||
if (useVolumeOnSameMount) {
|
||||
@ -1631,6 +1633,7 @@ public ReplicaHandler createRbw(
|
||||
if (ref == null) {
|
||||
ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
|
||||
}
|
||||
LOG.debug("Creating Rbw, block: {} on volume: {}", b, ref.getVolume());
|
||||
|
||||
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
||||
// create an rbw file to hold block in the designated volume
|
||||
@ -1904,6 +1907,8 @@ public ReplicaHandler createTemporary(StorageType storageType,
|
||||
ReplicaInPipeline newReplicaInfo;
|
||||
try {
|
||||
newReplicaInfo = v.createTemporary(b);
|
||||
LOG.debug("creating temporary for block: {} on volume: {}",
|
||||
b, ref.getVolume());
|
||||
} catch (IOException e) {
|
||||
IOUtils.cleanupWithLogger(null, ref);
|
||||
throw e;
|
||||
|
@ -416,6 +416,11 @@ public void waitForMinLength(long minLength, long time, TimeUnit unit)
|
||||
} while (deadLine > System.currentTimeMillis());
|
||||
throw new IOException("Minimum length was not achieved within timeout");
|
||||
}
|
||||
|
||||
@Override
|
||||
public FsVolumeSpi getVolume() {
|
||||
return getStorage(theBlock).getVolume();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
public class ExternalReplica implements Replica {
|
||||
|
||||
@ -62,4 +63,9 @@ public String getStorageUuid() {
|
||||
public boolean isOnTransientStorage() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FsVolumeSpi getVolume() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.ChunkChecksum;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
@ -135,4 +136,9 @@ public void interruptThread() {
|
||||
public void waitForMinLength(long minLength, long time, TimeUnit unit)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public FsVolumeSpi getVolume() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user