HDFS-12255. Block Storage: Cblock should generated unique trace ID for the ops. Contributed by Mukul Kumar Singh.

This commit is contained in:
Chen Liang 2017-08-15 10:57:11 -07:00 committed by Owen O'Malley
parent e61530f5d9
commit 6a16d7c7ab
4 changed files with 46 additions and 47 deletions

View File

@ -93,9 +93,9 @@ public void run() {
long endTime = Time.monotonicNow();
Preconditions.checkState(data.length > 0, "Block data is zero length");
startTime = Time.monotonicNow();
// BUG: fix the trace ID.
ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), data, "");
Long.toString(block.getBlockID()), data,
flusher.getTraceID(new File(dbPath), block.getBlockID()));
endTime = Time.monotonicNow();
flusher.getTargetMetrics().updateContainerWriteLatency(
endTime - startTime);

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
@ -37,6 +38,8 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
@ -45,6 +48,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -103,6 +107,7 @@ public class ContainerCacheFlusher implements Runnable {
private AtomicBoolean shutdown;
private final long levelDBCacheSize;
private final int maxRetryCount;
private final String tracePrefix;
private final ConcurrentMap<String, FinishCounter> finishCountMap;
@ -158,6 +163,7 @@ public ContainerCacheFlusher(Configuration config,
this.maxRetryCount =
config.getInt(CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_KEY,
CBlockConfigKeys.DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT);
this.tracePrefix = getTracePrefix();
}
private void checkExistingLog(String prefixFileName, File dbPath) {
@ -436,6 +442,40 @@ public void run() {
LOG.info("Exiting flusher");
}
/**
* Tries to get the local host IP Address as trace prefix
* for creating trace IDs, otherwise uses a random UUID for it.
*/
private static String getTracePrefix() {
String tmp;
try {
tmp = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException ex) {
tmp = UUID.randomUUID().toString();
LOG.error("Unable to read the host address. Using a GUID for " +
"hostname:{} ", tmp, ex);
}
return tmp;
}
/**
* We create a trace ID to make it easy to debug issues.
* A trace ID is in IPAddress:UserName:VolumeName:blockID:second format.
*
* This will get written down on the data node if we get any failures, so
* with this trace ID we can correlate cBlock failures across machines.
*
* @param blockID - Block ID
* @return trace ID
*/
public String getTraceID(File dbPath, long blockID) {
String volumeName = dbPath.getName();
String userName = dbPath.getParentFile().getName();
// mapping to seconds to make the string smaller.
return tracePrefix + ":" + userName + ":" + volumeName
+ ":" + blockID + ":" + Time.monotonicNow() / 1000;
}
/**
* Keeps a Reference counted DB that we close only when the total Reference
* has gone to zero.

View File

@ -174,9 +174,9 @@ public void writeBlock(LogicalBlock block) throws IOException {
long startTime = Time.monotonicNow();
client = parentCache.getClientManager()
.acquireClient(parentCache.getPipeline(block.getBlockID()));
// BUG: fix the trace ID.
ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), block.getData().array(), "");
Long.toString(block.getBlockID()), block.getData().array(),
parentCache.getTraceID(block.getBlockID()));
long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) {
String datahash = DigestUtils.sha256Hex(block.getData().array());

View File

@ -27,23 +27,19 @@
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.LevelDBStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.cblock.CBlockConfigKeys
.DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT;
@ -74,7 +70,7 @@ public class CBlockLocalCache implements CacheModule {
private final LevelDBStore cacheDB;
/**
* Asyncblock writer updates the cacheDB and writes the blocks async to
* AsyncBlock writer updates the cacheDB and writes the blocks async to
* remote containers.
*/
private final AsyncBlockWriter blockWriter;
@ -85,17 +81,8 @@ public class CBlockLocalCache implements CacheModule {
* update the cacheDB.
*/
private final SyncBlockReader blockReader;
/**
* We create a trace ID to make it easy to debug issues.
* A trace ID is in the following format. IPAddress:VolumeName:blockID:second
* <p>
* This will get written down on the data node if we get any failures, so
* with this trace ID we can correlate cBlock failures across machines.
*/
private final String userName;
private final String volumeName;
private final String ipAddressString;
private final String tracePrefix;
/**
* From a block ID we are able to get the pipeline by indexing this array.
@ -160,8 +147,6 @@ public File getDbPath() {
cacheDB = flusher.getCacheDB(dbPath.toString());
this.containerList = containerPipelines.toArray(new
Pipeline[containerPipelines.size()]);
this.ipAddressString = getHostIP();
this.tracePrefix = ipAddressString + ":" + this.volumeName;
this.volumeSize = volumeSize;
blockWriter = new AsyncBlockWriter(conf, this);
@ -325,22 +310,6 @@ public boolean isDirtyCache() {
return false;
}
/**
* Tries to get the local host IP Address for creating trace IDs.
*/
private String getHostIP() {
String tmp;
try {
tmp = InetAddress.getLocalHost().toString();
} catch (UnknownHostException ex) {
tmp = UUID.randomUUID().toString();
LOG.error("Unable to read the host address. Using a GUID for " +
"hostname:{} ", tmp, ex);
}
return tmp;
}
/**
* Returns the local cache DB.
*
@ -397,18 +366,8 @@ Pipeline getPipeline(long blockId) {
return containerList[containerIdx];
}
/**
* Returns a traceID based in Block ID.
* The format is HostIP:VolumeName:BlockID:timeStamp, in case of error this
* will be logged on the container side.
*
* @param blockID - Block ID
* @return trace ID
*/
String getTraceID(long blockID) {
// mapping to seconds to make the string smaller.
return this.tracePrefix + ":" + blockID + ":"
+ Time.monotonicNow() / 1000;
return flusher.getTraceID(dbPath, blockID);
}
/**