HDFS-9027. Refactor o.a.h.hdfs.DataStreamer#isLazyPersist() method. (Contributed by Mingliang Liu)
This commit is contained in:
parent
ca0827a862
commit
15a557fcfe
@ -34,12 +34,20 @@ public final class HdfsConstants {
|
|||||||
* URI Scheme for hdfs://namenode/ URIs.
|
* URI Scheme for hdfs://namenode/ URIs.
|
||||||
*/
|
*/
|
||||||
public static final String HDFS_URI_SCHEME = "hdfs";
|
public static final String HDFS_URI_SCHEME = "hdfs";
|
||||||
|
|
||||||
|
public static final byte MEMORY_STORAGE_POLICY_ID = 15;
|
||||||
public static final String MEMORY_STORAGE_POLICY_NAME = "LAZY_PERSIST";
|
public static final String MEMORY_STORAGE_POLICY_NAME = "LAZY_PERSIST";
|
||||||
|
public static final byte ALLSSD_STORAGE_POLICY_ID = 12;
|
||||||
public static final String ALLSSD_STORAGE_POLICY_NAME = "ALL_SSD";
|
public static final String ALLSSD_STORAGE_POLICY_NAME = "ALL_SSD";
|
||||||
|
public static final byte ONESSD_STORAGE_POLICY_ID = 10;
|
||||||
public static final String ONESSD_STORAGE_POLICY_NAME = "ONE_SSD";
|
public static final String ONESSD_STORAGE_POLICY_NAME = "ONE_SSD";
|
||||||
|
public static final byte HOT_STORAGE_POLICY_ID = 7;
|
||||||
public static final String HOT_STORAGE_POLICY_NAME = "HOT";
|
public static final String HOT_STORAGE_POLICY_NAME = "HOT";
|
||||||
|
public static final byte WARM_STORAGE_POLICY_ID = 5;
|
||||||
public static final String WARM_STORAGE_POLICY_NAME = "WARM";
|
public static final String WARM_STORAGE_POLICY_NAME = "WARM";
|
||||||
|
public static final byte COLD_STORAGE_POLICY_ID = 2;
|
||||||
public static final String COLD_STORAGE_POLICY_NAME = "COLD";
|
public static final String COLD_STORAGE_POLICY_NAME = "COLD";
|
||||||
|
|
||||||
// TODO should be conf injected?
|
// TODO should be conf injected?
|
||||||
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
|
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
|
||||||
/**
|
/**
|
||||||
|
@ -906,6 +906,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-6763. Initialize file system-wide quota once on transitioning to active
|
HDFS-6763. Initialize file system-wide quota once on transitioning to active
|
||||||
(kihwal)
|
(kihwal)
|
||||||
|
|
||||||
|
HDFS-9027. Refactor o.a.h.hdfs.DataStreamer#isLazyPersist() method.
|
||||||
|
(Mingliang Liu via Arpit Agarwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
@ -46,7 +46,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
@ -69,7 +68,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
|
|||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||||
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||||
@ -155,9 +153,7 @@ class DataStreamer extends Daemon {
|
|||||||
* @return if this file is lazy persist
|
* @return if this file is lazy persist
|
||||||
*/
|
*/
|
||||||
static boolean isLazyPersist(HdfsFileStatus stat) {
|
static boolean isLazyPersist(HdfsFileStatus stat) {
|
||||||
final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
|
return stat.getStoragePolicy() == HdfsConstants.MEMORY_STORAGE_POLICY_ID;
|
||||||
HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
|
|
||||||
return p != null && stat.getStoragePolicy() == p.getId();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -379,8 +375,6 @@ class DataStreamer extends Daemon {
|
|||||||
private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
|
private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
|
||||||
private final AtomicReference<CachingStrategy> cachingStrategy;
|
private final AtomicReference<CachingStrategy> cachingStrategy;
|
||||||
private final ByteArrayManager byteArrayManager;
|
private final ByteArrayManager byteArrayManager;
|
||||||
private static final BlockStoragePolicySuite blockStoragePolicySuite =
|
|
||||||
BlockStoragePolicySuite.createDefaultSuite();
|
|
||||||
//persist blocks on namenode
|
//persist blocks on namenode
|
||||||
private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
|
private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
|
||||||
private boolean failPacket = false;
|
private boolean failPacket = false;
|
||||||
|
@ -26,7 +26,6 @@ import org.apache.hadoop.fs.XAttr;
|
|||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -48,37 +47,37 @@ public class BlockStoragePolicySuite {
|
|||||||
public static BlockStoragePolicySuite createDefaultSuite() {
|
public static BlockStoragePolicySuite createDefaultSuite() {
|
||||||
final BlockStoragePolicy[] policies =
|
final BlockStoragePolicy[] policies =
|
||||||
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
||||||
final byte lazyPersistId = HdfsServerConstants.MEMORY_STORAGE_POLICY_ID;
|
final byte lazyPersistId = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
|
||||||
policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId,
|
policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId,
|
||||||
HdfsConstants.MEMORY_STORAGE_POLICY_NAME,
|
HdfsConstants.MEMORY_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
|
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK},
|
new StorageType[]{StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK},
|
new StorageType[]{StorageType.DISK},
|
||||||
true); // Cannot be changed on regular files, but inherited.
|
true); // Cannot be changed on regular files, but inherited.
|
||||||
final byte allssdId = HdfsServerConstants.ALLSSD_STORAGE_POLICY_ID;
|
final byte allssdId = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
|
||||||
policies[allssdId] = new BlockStoragePolicy(allssdId,
|
policies[allssdId] = new BlockStoragePolicy(allssdId,
|
||||||
HdfsConstants.ALLSSD_STORAGE_POLICY_NAME,
|
HdfsConstants.ALLSSD_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.SSD},
|
new StorageType[]{StorageType.SSD},
|
||||||
new StorageType[]{StorageType.DISK},
|
new StorageType[]{StorageType.DISK},
|
||||||
new StorageType[]{StorageType.DISK});
|
new StorageType[]{StorageType.DISK});
|
||||||
final byte onessdId = HdfsServerConstants.ONESSD_STORAGE_POLICY_ID;
|
final byte onessdId = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
|
||||||
policies[onessdId] = new BlockStoragePolicy(onessdId,
|
policies[onessdId] = new BlockStoragePolicy(onessdId,
|
||||||
HdfsConstants.ONESSD_STORAGE_POLICY_NAME,
|
HdfsConstants.ONESSD_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
new StorageType[]{StorageType.SSD, StorageType.DISK},
|
||||||
new StorageType[]{StorageType.SSD, StorageType.DISK});
|
new StorageType[]{StorageType.SSD, StorageType.DISK});
|
||||||
final byte hotId = HdfsServerConstants.HOT_STORAGE_POLICY_ID;
|
final byte hotId = HdfsConstants.HOT_STORAGE_POLICY_ID;
|
||||||
policies[hotId] = new BlockStoragePolicy(hotId,
|
policies[hotId] = new BlockStoragePolicy(hotId,
|
||||||
HdfsConstants.HOT_STORAGE_POLICY_NAME,
|
HdfsConstants.HOT_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
|
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
|
||||||
new StorageType[]{StorageType.ARCHIVE});
|
new StorageType[]{StorageType.ARCHIVE});
|
||||||
final byte warmId = HdfsServerConstants.WARM_STORAGE_POLICY_ID;
|
final byte warmId = HdfsConstants.WARM_STORAGE_POLICY_ID;
|
||||||
policies[warmId] = new BlockStoragePolicy(warmId,
|
policies[warmId] = new BlockStoragePolicy(warmId,
|
||||||
HdfsConstants.WARM_STORAGE_POLICY_NAME,
|
HdfsConstants.WARM_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
|
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
|
||||||
final byte coldId = HdfsServerConstants.COLD_STORAGE_POLICY_ID;
|
final byte coldId = HdfsConstants.COLD_STORAGE_POLICY_ID;
|
||||||
policies[coldId] = new BlockStoragePolicy(coldId,
|
policies[coldId] = new BlockStoragePolicy(coldId,
|
||||||
HdfsConstants.COLD_STORAGE_POLICY_NAME,
|
HdfsConstants.COLD_STORAGE_POLICY_NAME,
|
||||||
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
|
new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
|
||||||
|
@ -99,12 +99,6 @@ public interface HdfsServerConstants {
|
|||||||
};
|
};
|
||||||
byte[] DOT_SNAPSHOT_DIR_BYTES
|
byte[] DOT_SNAPSHOT_DIR_BYTES
|
||||||
= DFSUtil.string2Bytes(HdfsConstants.DOT_SNAPSHOT_DIR);
|
= DFSUtil.string2Bytes(HdfsConstants.DOT_SNAPSHOT_DIR);
|
||||||
byte MEMORY_STORAGE_POLICY_ID = 15;
|
|
||||||
byte ALLSSD_STORAGE_POLICY_ID = 12;
|
|
||||||
byte ONESSD_STORAGE_POLICY_ID = 10;
|
|
||||||
byte HOT_STORAGE_POLICY_ID = 7;
|
|
||||||
byte WARM_STORAGE_POLICY_ID = 5;
|
|
||||||
byte COLD_STORAGE_POLICY_ID = 2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Type of the node
|
* Type of the node
|
||||||
|
@ -76,12 +76,12 @@ public class TestBlockStoragePolicy {
|
|||||||
static final long FILE_LEN = 1024;
|
static final long FILE_LEN = 1024;
|
||||||
static final short REPLICATION = 3;
|
static final short REPLICATION = 3;
|
||||||
|
|
||||||
static final byte COLD = HdfsServerConstants.COLD_STORAGE_POLICY_ID;
|
static final byte COLD = HdfsConstants.COLD_STORAGE_POLICY_ID;
|
||||||
static final byte WARM = HdfsServerConstants.WARM_STORAGE_POLICY_ID;
|
static final byte WARM = HdfsConstants.WARM_STORAGE_POLICY_ID;
|
||||||
static final byte HOT = HdfsServerConstants.HOT_STORAGE_POLICY_ID;
|
static final byte HOT = HdfsConstants.HOT_STORAGE_POLICY_ID;
|
||||||
static final byte ONESSD = HdfsServerConstants.ONESSD_STORAGE_POLICY_ID;
|
static final byte ONESSD = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
|
||||||
static final byte ALLSSD = HdfsServerConstants.ALLSSD_STORAGE_POLICY_ID;
|
static final byte ALLSSD = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
|
||||||
static final byte LAZY_PERSIST = HdfsServerConstants.MEMORY_STORAGE_POLICY_ID;
|
static final byte LAZY_PERSIST = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
|
||||||
|
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testConfigKeyEnabled() throws IOException {
|
public void testConfigKeyEnabled() throws IOException {
|
||||||
@ -1317,19 +1317,15 @@ public class TestBlockStoragePolicy {
|
|||||||
HdfsFileStatus status = fs.getClient().getFileInfo(file);
|
HdfsFileStatus status = fs.getClient().getFileInfo(file);
|
||||||
// 5. get file policy, it should be parent policy.
|
// 5. get file policy, it should be parent policy.
|
||||||
Assert
|
Assert
|
||||||
.assertTrue(
|
.assertTrue("File storage policy should be HOT",
|
||||||
"File storage policy should be HOT",
|
status.getStoragePolicy() == HOT);
|
||||||
status.getStoragePolicy()
|
|
||||||
== HdfsServerConstants.HOT_STORAGE_POLICY_ID);
|
|
||||||
// 6. restart NameNode for reloading edits logs.
|
// 6. restart NameNode for reloading edits logs.
|
||||||
cluster.restartNameNode(true);
|
cluster.restartNameNode(true);
|
||||||
// 7. get file policy, it should be parent policy.
|
// 7. get file policy, it should be parent policy.
|
||||||
status = fs.getClient().getFileInfo(file);
|
status = fs.getClient().getFileInfo(file);
|
||||||
Assert
|
Assert
|
||||||
.assertTrue(
|
.assertTrue("File storage policy should be HOT",
|
||||||
"File storage policy should be HOT",
|
status.getStoragePolicy() == HOT);
|
||||||
status.getStoragePolicy()
|
|
||||||
== HdfsServerConstants.HOT_STORAGE_POLICY_ID);
|
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user