Compare commits
10 Commits
e1d4e6e26d
...
affa2ee36c
Author | SHA1 | Date | |
---|---|---|---|
affa2ee36c | |||
1c64c6aa3c | |||
8bf19a216c | |||
8d242458e9 | |||
8677cbd3e0 | |||
b0e5465244 | |||
aa9f3e1da4 | |||
b58553bb1e | |||
a79791fb92 | |||
ac56c2f364 |
@ -277,6 +277,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
while (shouldRetry) {
|
||||
shouldRetry = false;
|
||||
try {
|
||||
// 通过rpc调用namenode(或者router,router最终还是会调用到namnode)创建文件。
|
||||
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
|
||||
new EnumSetWritable<>(flag), createParent, replication,
|
||||
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName,
|
||||
|
@ -1121,6 +1121,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
|
||||
HdfsClientConfigKeys
|
||||
.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
|
||||
// namenode 启动的keytab
|
||||
public static final String DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
|
||||
public static final String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
|
||||
|
@ -42,6 +42,9 @@
|
||||
*
|
||||
* This is responsible for sending edits as well as coordinating
|
||||
* recovery of the nodes.
|
||||
* 主要适用于Journal组件中处理edits log:
|
||||
* 1、 接受edit log
|
||||
* 2、 发送edit给namenode,方便恢复
|
||||
*/
|
||||
@KerberosInfo(
|
||||
serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY,
|
||||
@ -101,6 +104,7 @@ public void journal(RequestInfo reqInfo,
|
||||
|
||||
/**
|
||||
* Start writing to a new log segment on the JournalNode.
|
||||
* NameNode 发送命令让 JournalNode 开启一个新的日志段 LogSegment
|
||||
* Before calling this, one should finalize the previous segment
|
||||
* using {@link #finalizeLogSegment(RequestInfo, long, long)}.
|
||||
*
|
||||
|
@ -1772,7 +1772,10 @@ private void addCacheCommands(String blockPoolId, DatanodeDescriptor nodeinfo,
|
||||
}
|
||||
}
|
||||
|
||||
/** Handle heartbeat from datanodes. */
|
||||
/**
|
||||
* Handle heartbeat from datanodes.
|
||||
* 万恶的heartbeat,啥都干。。。
|
||||
* */
|
||||
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
|
||||
StorageReport[] reports, final String blockPoolId,
|
||||
long cacheCapacity, long cacheUsed, int xceiverCount,
|
||||
|
@ -3095,8 +3095,10 @@ public static DataNode createDataNode(String args[],
|
||||
@InterfaceAudience.Private
|
||||
public static DataNode createDataNode(String args[], Configuration conf,
|
||||
SecureResources resources) throws IOException {
|
||||
// 初始化DataNode
|
||||
DataNode dn = instantiateDataNode(args, conf, resources);
|
||||
if (dn != null) {
|
||||
// 启动DataNode
|
||||
dn.runDatanodeDaemon();
|
||||
}
|
||||
return dn;
|
||||
|
@ -59,6 +59,7 @@ static long delete(FSDirectory fsd, INodesInPath iip,
|
||||
ReclaimContext context = new ReclaimContext(
|
||||
fsd.getBlockStoragePolicySuite(), collectedBlocks, removedINodes,
|
||||
removedUCFiles);
|
||||
// 更核心的删除代码再这个函数里面,会调用destroyAndCollectBlocks删除block
|
||||
if (unprotectedDelete(fsd, iip, context, mtime)) {
|
||||
filesRemoved = context.quotaDelta().getNsDelta();
|
||||
fsn.removeSnapshottableDirs(snapshottableDirs);
|
||||
@ -255,6 +256,7 @@ private static boolean unprotectedDelete(FSDirectory fsd, INodesInPath iip,
|
||||
|
||||
// collect block and update quota
|
||||
if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
|
||||
// 真正删除代码
|
||||
targetNode.destroyAndCollectBlocks(reclaimContext);
|
||||
} else {
|
||||
targetNode.cleanSubtree(reclaimContext, CURRENT_STATE_ID, latestSnapshot);
|
||||
|
@ -151,7 +151,10 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
|
||||
.isdir(true)
|
||||
.build();
|
||||
|
||||
// 整个文件系统目录树的根节点, 是INodeDirectory类型的 。
|
||||
INodeDirectory rootDir;
|
||||
|
||||
// Namenode的核心类, 这个类主要支持对数据块进行操作的一些方法, 例如addBlock()。
|
||||
private final FSNamesystem namesystem;
|
||||
private volatile boolean skipQuotaCheck = false; //skip while consuming edits
|
||||
private final int maxComponentLength;
|
||||
@ -159,6 +162,7 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
|
||||
private final int lsLimit; // max list limit
|
||||
private final int contentCountLimit; // max content summary counts per run
|
||||
private final long contentSleepMicroSec;
|
||||
// 记录根目录下所有的INode,并维护INodeId ->INode的映射关系。
|
||||
private final INodeMap inodeMap; // Synchronized by dirLock
|
||||
private long yieldCount = 0; // keep track of lock yield count.
|
||||
private int quotaInitThreads;
|
||||
@ -197,6 +201,9 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
|
||||
private final String supergroup;
|
||||
private final INodeId inodeId;
|
||||
|
||||
/**
|
||||
* 写editlog。
|
||||
*/
|
||||
private final FSEditLog editLog;
|
||||
|
||||
private HdfsFileStatus[] reservedStatuses;
|
||||
@ -300,6 +307,7 @@ public int getListLimit() {
|
||||
/**
|
||||
* Caches frequently used file names used in {@link INode} to reuse
|
||||
* byte[] objects and reduce heap usage.
|
||||
* 将常用的name缓存下来, 以降低byte[]的使用, 并降低JVM heap的使用
|
||||
*/
|
||||
private final NameCache<ByteArray> nameCache;
|
||||
|
||||
@ -318,6 +326,7 @@ public enum DirOp {
|
||||
this.inodeId = new INodeId();
|
||||
rootDir = createRoot(ns);
|
||||
inodeMap = INodeMap.newInstance(rootDir);
|
||||
// 是否开启权限管理
|
||||
this.isPermissionEnabled = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
|
||||
|
@ -225,6 +225,7 @@ public void load(File file, boolean requireSameLayoutVersion)
|
||||
FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
|
||||
conf, fsn, requireSameLayoutVersion);
|
||||
impl = loader;
|
||||
// 开始加载fsimage
|
||||
loader.load(file);
|
||||
} else {
|
||||
Loader loader = new Loader(conf, fsn);
|
||||
|
@ -360,6 +360,7 @@ private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
|
||||
if (!FSImageUtil.checkFileFormat(raFile)) {
|
||||
throw new IOException("Unrecognized file format");
|
||||
}
|
||||
// 加载summary
|
||||
FileSummary summary = FSImageUtil.loadSummary(raFile);
|
||||
if (requireSameLayoutVersion && summary.getLayoutVersion() !=
|
||||
HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
|
||||
@ -399,6 +400,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
|
||||
* a particular step to be started for once.
|
||||
*/
|
||||
Step currentStep = null;
|
||||
// 是否开启并发加载
|
||||
boolean loadInParallel = enableParallelSaveAndLoad(conf);
|
||||
|
||||
ExecutorService executorService = null;
|
||||
|
@ -805,6 +805,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
||||
|
||||
long loadStart = monotonicNow();
|
||||
try {
|
||||
// 加载FSImage,将其和EditLog合并生成新的FSImage,因此可能启动的是欧会比较慢。
|
||||
namesystem.loadFSImage(startOpt);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Encountered exception loading fsimage", ioe);
|
||||
@ -1217,12 +1218,13 @@ private List<AuditLogger> initAuditLoggers(Configuration conf) {
|
||||
return Collections.unmodifiableList(auditLoggers);
|
||||
}
|
||||
|
||||
// FSNamesystem在初始化完FSDirectory dir成员,会调用loadFSImage方法,从fsimage和edits加载元数据信息
|
||||
private void loadFSImage(StartupOption startOpt) throws IOException {
|
||||
final FSImage fsImage = getFSImage();
|
||||
|
||||
// format before starting up if requested
|
||||
// format before starting up if requested // 如果启动选项类型为FORMAT(格式化),在启动之前需要进行格式化
|
||||
if (startOpt == StartupOption.FORMAT) {
|
||||
// reuse current id
|
||||
// reuse current id 对FSImage执行格式化操作
|
||||
fsImage.format(this, fsImage.getStorage().determineClusterId(), false);
|
||||
|
||||
startOpt = StartupOption.REGULAR;
|
||||
@ -1231,6 +1233,7 @@ private void loadFSImage(StartupOption startOpt) throws IOException {
|
||||
writeLock();
|
||||
try {
|
||||
// We shouldn't be calling saveNamespace if we've come up in standby state.
|
||||
// 根据启动选项及其对应存储目录(${dfs.name.dir}),分析存储目录,必要的话从先前的事务恢复过来
|
||||
MetaRecoveryContext recovery = startOpt.createRecoveryContext();
|
||||
final boolean staleImage
|
||||
= fsImage.recoverTransitionRead(startOpt, this, recovery);
|
||||
@ -1302,14 +1305,17 @@ void startCommonServices(Configuration conf, HAContext haContext) throws IOExcep
|
||||
writeLock();
|
||||
this.haContext = haContext;
|
||||
try {
|
||||
//创建NameNodeResourceChecker,并立即检查一次
|
||||
nnResourceChecker = new NameNodeResourceChecker(conf);
|
||||
checkAvailableResources();
|
||||
assert !blockManager.isPopulatingReplQueues();
|
||||
StartupProgress prog = NameNode.getStartupProgress();
|
||||
prog.beginPhase(Phase.SAFEMODE);
|
||||
//获取已完成的数据块总量
|
||||
long completeBlocksTotal = getCompleteBlocksTotal();
|
||||
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
|
||||
completeBlocksTotal);
|
||||
// 激活blockManager,blockManager负责管理文件系统中文件的物理块与实际存储位置的映射关系,是NameNode的核心功能之一。
|
||||
blockManager.activate(conf, completeBlocksTotal);
|
||||
} finally {
|
||||
writeUnlock("startCommonServices");
|
||||
@ -1321,6 +1327,7 @@ void startCommonServices(Configuration conf, HAContext haContext) throws IOExcep
|
||||
inodeAttributeProvider.start();
|
||||
dir.setINodeAttributeProvider(inodeAttributeProvider);
|
||||
}
|
||||
// 注册快照管理器
|
||||
snapshotManager.registerMXBean();
|
||||
InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
|
||||
this.nameNodeHostName = (serviceAddress != null) ?
|
||||
|
@ -34,6 +34,7 @@
|
||||
@InterfaceStability.Unstable
|
||||
public abstract class INodeAttributeProvider {
|
||||
|
||||
// 访问控制执行者接口
|
||||
public static class AuthorizationContext {
|
||||
private String fsOwner;
|
||||
private String supergroup;
|
||||
|
@ -746,10 +746,10 @@ protected void initialize(Configuration conf) throws IOException {
|
||||
intervals);
|
||||
}
|
||||
}
|
||||
|
||||
//登录kerberos
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
loginAsNameNodeUser(conf);
|
||||
|
||||
// 注册监控信息
|
||||
NameNode.initMetrics(conf, this.getRole());
|
||||
StartupProgressMetrics.register(startupProgress);
|
||||
|
||||
@ -775,12 +775,16 @@ protected void initialize(Configuration conf) throws IOException {
|
||||
}
|
||||
|
||||
if (NamenodeRole.NAMENODE == role) {
|
||||
// 启动HTTPServer,会调用NameNodeHttpServer中的start函数,是基于org.eclipse.jetty.server.Server实现的
|
||||
startHttpServer(conf);
|
||||
}
|
||||
|
||||
// 从本地加载FSImage,并且与Editlog合并产生新的FSImage
|
||||
loadNamesystem(conf);
|
||||
//TODO 待确认用途
|
||||
startAliasMapServerIfNecessary(conf);
|
||||
|
||||
//创建rpcserver,封装了NameNodeRpcServer、ClientRPCServer
|
||||
//支持ClientNameNodeProtocol、DataNodeProtocolPB等协议
|
||||
rpcServer = createRpcServer(conf);
|
||||
|
||||
initReconfigurableBackoffKey();
|
||||
@ -801,6 +805,7 @@ protected void initialize(Configuration conf) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
//启动执行多个重要的工作线程
|
||||
startCommonServices(conf);
|
||||
startMetricsLogger(conf);
|
||||
}
|
||||
@ -880,6 +885,7 @@ protected NameNodeRpcServer createRpcServer(Configuration conf)
|
||||
|
||||
/** Start the services common to active and standby states */
|
||||
private void startCommonServices(Configuration conf) throws IOException {
|
||||
// 创建NameNodeResourceChecker、激活BlockManager等
|
||||
namesystem.startCommonServices(conf, haContext);
|
||||
registerNNSMXBean();
|
||||
if (NamenodeRole.NAMENODE != role) {
|
||||
@ -890,8 +896,10 @@ private void startCommonServices(Configuration conf) throws IOException {
|
||||
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
|
||||
}
|
||||
}
|
||||
// 启动rpc服务
|
||||
rpcServer.start();
|
||||
try {
|
||||
// 获取启动插件列表
|
||||
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
|
||||
ServicePlugin.class);
|
||||
} catch (RuntimeException e) {
|
||||
@ -900,8 +908,10 @@ private void startCommonServices(Configuration conf) throws IOException {
|
||||
pluginsValue, e);
|
||||
throw e;
|
||||
}
|
||||
// 启动所有插件
|
||||
for (ServicePlugin p: plugins) {
|
||||
try {
|
||||
// 调用插件的start接口,需要插件自己实现,需要实现接口ServicePlugin
|
||||
p.start(this);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("ServicePlugin " + p + " could not be started", t);
|
||||
@ -1025,11 +1035,13 @@ protected NameNode(Configuration conf, NamenodeRole role)
|
||||
+ " this namenode/service.", clientNamenodeAddress);
|
||||
}
|
||||
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
|
||||
// 检查HA的状态,主要是判断当前启动的是主实例还是备实例
|
||||
state = createHAState(getStartupOption(conf));
|
||||
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
|
||||
this.haContext = createHAContext();
|
||||
try {
|
||||
initializeGenericKeys(conf, nsId, namenodeId);
|
||||
// 启动NameNode
|
||||
initialize(getConf());
|
||||
state.prepareToEnterState(haContext);
|
||||
try {
|
||||
|
@ -785,6 +785,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
|
||||
String storagePolicy)
|
||||
throws IOException {
|
||||
checkNNStartup();
|
||||
// 获取客户端的ip
|
||||
String clientMachine = getClientMachine();
|
||||
if (stateChangeLog.isDebugEnabled()) {
|
||||
stateChangeLog.debug("*DIR* NameNode.create: file "
|
||||
|
@ -1676,6 +1676,7 @@ public static void main(String[] args) {
|
||||
ShutdownHookManager.get().addShutdownHook(
|
||||
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
|
||||
JobConf conf = new JobConf(new YarnConfiguration());
|
||||
// 加载job.xml信息
|
||||
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
|
||||
|
||||
MRWebAppUtil.initialize(conf);
|
||||
|
@ -252,6 +252,7 @@ public void run() {
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void handleJobSetup(CommitterJobSetupEvent event) {
|
||||
try {
|
||||
// 主要是创建attempt路径
|
||||
committer.setupJob(event.getJobContext());
|
||||
context.getEventHandler().handle(
|
||||
new JobSetupCompletedEvent(event.getJobID()));
|
||||
|
@ -1455,6 +1455,7 @@ public JobStateInternal transition(JobImpl job, JobEvent event) {
|
||||
}
|
||||
|
||||
try {
|
||||
// 初始化token等信息。
|
||||
setup(job);
|
||||
job.fs = job.getFileSystem(job.conf);
|
||||
|
||||
|
@ -1669,12 +1669,14 @@ public void submit()
|
||||
throws IOException, InterruptedException, ClassNotFoundException {
|
||||
ensureState(JobState.DEFINE);
|
||||
setUseNewAPI();
|
||||
// 连接RM
|
||||
connect();
|
||||
final JobSubmitter submitter =
|
||||
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
|
||||
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
|
||||
public JobStatus run() throws IOException, InterruptedException,
|
||||
ClassNotFoundException {
|
||||
// 提交作业
|
||||
return submitter.submitJobInternal(Job.this, cluster);
|
||||
}
|
||||
});
|
||||
|
@ -213,6 +213,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application,
|
||||
|
||||
/**
|
||||
* Assign containers to applications in the queue or it's children (if any).
|
||||
* 分配资源,包括父队列以及子队列
|
||||
*
|
||||
* @param clusterResource the resource of the cluster.
|
||||
* @param candidates {@link CandidateNodeSet} the nodes that are considered
|
||||
* for the current placement.
|
||||
|
@ -515,6 +515,13 @@ long getAsyncScheduleInterval() {
|
||||
|
||||
private final static Random random = new Random(System.currentTimeMillis());
|
||||
|
||||
/**
|
||||
* 跳过已经由2次心跳周期没有上报的节点。
|
||||
* @param node
|
||||
* @param cs
|
||||
* @param printVerboseLog
|
||||
* @return
|
||||
*/
|
||||
private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
|
||||
CapacityScheduler cs, boolean printVerboseLog) {
|
||||
// Skip node which missed 2 heartbeats since the node might be dead and
|
||||
@ -557,7 +564,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
|
||||
printedVerboseLoggingForAsyncScheduling = false;
|
||||
}
|
||||
|
||||
// Allocate containers of node [start, end)
|
||||
// Allocate containers of node [start, end) 从随机的节点开始分配,可以保证分配均衡。
|
||||
for (FiCaSchedulerNode node : nodes) {
|
||||
if (current++ >= start) {
|
||||
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
|
||||
@ -569,7 +576,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
|
||||
|
||||
current = 0;
|
||||
|
||||
// Allocate containers of node [0, start)
|
||||
// Allocate containers of node [0, start) 从0到随机的节点
|
||||
for (FiCaSchedulerNode node : nodes) {
|
||||
if (current++ > start) {
|
||||
break;
|
||||
@ -1796,6 +1803,7 @@ CSAssignment allocateContainersToNode(
|
||||
if (!multiNodePlacementEnabled) {
|
||||
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
|
||||
node.getNodeID());
|
||||
// 申请资源到单个节点,老版本逻辑
|
||||
assignment = allocateContainerOnSingleNode(candidates,
|
||||
node, withNodeHeartbeat);
|
||||
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
|
||||
@ -1803,6 +1811,7 @@ CSAssignment allocateContainersToNode(
|
||||
} else{
|
||||
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
|
||||
ActivitiesManager.EMPTY_NODE_ID);
|
||||
// 申请资源到多个节点,新逻辑
|
||||
assignment = allocateContainersOnMultiNodes(candidates);
|
||||
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
|
||||
ActivitiesManager.EMPTY_NODE_ID, candidates.getPartition());
|
||||
|
Loading…
Reference in New Issue
Block a user