Compare commits

...

10 Commits

19 changed files with 70 additions and 8 deletions

View File

@ -277,6 +277,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
while (shouldRetry) {
shouldRetry = false;
try {
// 通过rpc调用namenode(或者routerrouter最终还是会调用到namnode)创建文件
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName,

View File

@ -1121,6 +1121,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
HdfsClientConfigKeys
.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;
// namenode 启动的keytab
public static final String DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
public static final String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;

View File

@ -42,6 +42,9 @@
*
* This is responsible for sending edits as well as coordinating
* recovery of the nodes.
* 主要适用于Journal组件中处理edits log
* 1 接受edit log
* 2 发送edit给namenode方便恢复
*/
@KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY,
@ -101,6 +104,7 @@ public void journal(RequestInfo reqInfo,
/**
* Start writing to a new log segment on the JournalNode.
* NameNode 发送命令让 JournalNode 开启一个新的日志段 LogSegment
* Before calling this, one should finalize the previous segment
* using {@link #finalizeLogSegment(RequestInfo, long, long)}.
*

View File

@ -1772,7 +1772,10 @@ private void addCacheCommands(String blockPoolId, DatanodeDescriptor nodeinfo,
}
}
/** Handle heartbeat from datanodes. */
/**
* Handle heartbeat from datanodes.
* 万恶的heartbeat啥都干
* */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,

View File

@ -3095,8 +3095,10 @@ public static DataNode createDataNode(String args[],
@InterfaceAudience.Private
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
// 初始化DataNode
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
// 启动DataNode
dn.runDatanodeDaemon();
}
return dn;

View File

@ -59,6 +59,7 @@ static long delete(FSDirectory fsd, INodesInPath iip,
ReclaimContext context = new ReclaimContext(
fsd.getBlockStoragePolicySuite(), collectedBlocks, removedINodes,
removedUCFiles);
// 更核心的删除代码再这个函数里面会调用destroyAndCollectBlocks删除block
if (unprotectedDelete(fsd, iip, context, mtime)) {
filesRemoved = context.quotaDelta().getNsDelta();
fsn.removeSnapshottableDirs(snapshottableDirs);
@ -255,6 +256,7 @@ private static boolean unprotectedDelete(FSDirectory fsd, INodesInPath iip,
// collect block and update quota
if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
// 真正删除代码
targetNode.destroyAndCollectBlocks(reclaimContext);
} else {
targetNode.cleanSubtree(reclaimContext, CURRENT_STATE_ID, latestSnapshot);

View File

@ -151,7 +151,10 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
.isdir(true)
.build();
// 整个文件系统目录树的根节点 是INodeDirectory类型的
INodeDirectory rootDir;
// Namenode的核心类 这个类主要支持对数据块进行操作的一些方法 例如addBlock()
private final FSNamesystem namesystem;
private volatile boolean skipQuotaCheck = false; //skip while consuming edits
private final int maxComponentLength;
@ -159,6 +162,7 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
private final int lsLimit; // max list limit
private final int contentCountLimit; // max content summary counts per run
private final long contentSleepMicroSec;
// 记录根目录下所有的INode,并维护INodeId ->INode的映射关系
private final INodeMap inodeMap; // Synchronized by dirLock
private long yieldCount = 0; // keep track of lock yield count.
private int quotaInitThreads;
@ -197,6 +201,9 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
private final String supergroup;
private final INodeId inodeId;
/**
* 写editlog
*/
private final FSEditLog editLog;
private HdfsFileStatus[] reservedStatuses;
@ -300,6 +307,7 @@ public int getListLimit() {
/**
* Caches frequently used file names used in {@link INode} to reuse
* byte[] objects and reduce heap usage.
* 将常用的name缓存下来 以降低byte[]的使用 并降低JVM heap的使用
*/
private final NameCache<ByteArray> nameCache;
@ -318,6 +326,7 @@ public enum DirOp {
this.inodeId = new INodeId();
rootDir = createRoot(ns);
inodeMap = INodeMap.newInstance(rootDir);
// 是否开启权限管理
this.isPermissionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);

View File

@ -225,6 +225,7 @@ public void load(File file, boolean requireSameLayoutVersion)
FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
conf, fsn, requireSameLayoutVersion);
impl = loader;
// 开始加载fsimage
loader.load(file);
} else {
Loader loader = new Loader(conf, fsn);

View File

@ -360,6 +360,7 @@ private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
if (!FSImageUtil.checkFileFormat(raFile)) {
throw new IOException("Unrecognized file format");
}
// 加载summary
FileSummary summary = FSImageUtil.loadSummary(raFile);
if (requireSameLayoutVersion && summary.getLayoutVersion() !=
HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
@ -399,6 +400,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
* a particular step to be started for once.
*/
Step currentStep = null;
// 是否开启并发加载
boolean loadInParallel = enableParallelSaveAndLoad(conf);
ExecutorService executorService = null;

View File

@ -805,6 +805,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
long loadStart = monotonicNow();
try {
// 加载FSImage将其和EditLog合并生成新的FSImage,因此可能启动的是欧会比较慢
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
@ -1217,12 +1218,13 @@ private List<AuditLogger> initAuditLoggers(Configuration conf) {
return Collections.unmodifiableList(auditLoggers);
}
// FSNamesystem在初始化完FSDirectory dir成员会调用loadFSImage方法从fsimage和edits加载元数据信息
private void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage();
// format before starting up if requested
// format before starting up if requested // 如果启动选项类型为FORMAT格式化在启动之前需要进行格式化
if (startOpt == StartupOption.FORMAT) {
// reuse current id
// reuse current id 对FSImage执行格式化操作
fsImage.format(this, fsImage.getStorage().determineClusterId(), false);
startOpt = StartupOption.REGULAR;
@ -1231,6 +1233,7 @@ private void loadFSImage(StartupOption startOpt) throws IOException {
writeLock();
try {
// We shouldn't be calling saveNamespace if we've come up in standby state.
// 根据启动选项及其对应存储目录(${dfs.name.dir})分析存储目录必要的话从先前的事务恢复过来
MetaRecoveryContext recovery = startOpt.createRecoveryContext();
final boolean staleImage
= fsImage.recoverTransitionRead(startOpt, this, recovery);
@ -1302,14 +1305,17 @@ void startCommonServices(Configuration conf, HAContext haContext) throws IOExcep
writeLock();
this.haContext = haContext;
try {
//创建NameNodeResourceChecker并立即检查一次
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
assert !blockManager.isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
//获取已完成的数据块总量
long completeBlocksTotal = getCompleteBlocksTotal();
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
completeBlocksTotal);
// 激活blockManagerblockManager负责管理文件系统中文件的物理块与实际存储位置的映射关系是NameNode的核心功能之一
blockManager.activate(conf, completeBlocksTotal);
} finally {
writeUnlock("startCommonServices");
@ -1321,6 +1327,7 @@ void startCommonServices(Configuration conf, HAContext haContext) throws IOExcep
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
// 注册快照管理器
snapshotManager.registerMXBean();
InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
this.nameNodeHostName = (serviceAddress != null) ?

View File

@ -34,6 +34,7 @@
@InterfaceStability.Unstable
public abstract class INodeAttributeProvider {
// 访问控制执行者接口
public static class AuthorizationContext {
private String fsOwner;
private String supergroup;

View File

@ -746,10 +746,10 @@ protected void initialize(Configuration conf) throws IOException {
intervals);
}
}
//登录kerberos
UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf);
// 注册监控信息
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
@ -775,12 +775,16 @@ protected void initialize(Configuration conf) throws IOException {
}
if (NamenodeRole.NAMENODE == role) {
// 启动HTTPServer,会调用NameNodeHttpServer中的start函数是基于org.eclipse.jetty.server.Server实现的
startHttpServer(conf);
}
// 从本地加载FSImage并且与Editlog合并产生新的FSImage
loadNamesystem(conf);
//TODO 待确认用途
startAliasMapServerIfNecessary(conf);
//创建rpcserver封装了NameNodeRpcServerClientRPCServer
//支持ClientNameNodeProtocolDataNodeProtocolPB等协议
rpcServer = createRpcServer(conf);
initReconfigurableBackoffKey();
@ -801,6 +805,7 @@ protected void initialize(Configuration conf) throws IOException {
}
}
//启动执行多个重要的工作线程
startCommonServices(conf);
startMetricsLogger(conf);
}
@ -880,6 +885,7 @@ protected NameNodeRpcServer createRpcServer(Configuration conf)
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
// 创建NameNodeResourceChecker激活BlockManager等
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) {
@ -890,8 +896,10 @@ private void startCommonServices(Configuration conf) throws IOException {
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
}
}
// 启动rpc服务
rpcServer.start();
try {
// 获取启动插件列表
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
} catch (RuntimeException e) {
@ -900,8 +908,10 @@ private void startCommonServices(Configuration conf) throws IOException {
pluginsValue, e);
throw e;
}
// 启动所有插件
for (ServicePlugin p: plugins) {
try {
// 调用插件的start接口需要插件自己实现需要实现接口ServicePlugin
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
@ -1025,11 +1035,13 @@ protected NameNode(Configuration conf, NamenodeRole role)
+ " this namenode/service.", clientNamenodeAddress);
}
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
// 检查HA的状态,主要是判断当前启动的是主实例还是备实例
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
initializeGenericKeys(conf, nsId, namenodeId);
// 启动NameNode
initialize(getConf());
state.prepareToEnterState(haContext);
try {

View File

@ -785,6 +785,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
String storagePolicy)
throws IOException {
checkNNStartup();
// 获取客户端的ip
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "

View File

@ -1676,6 +1676,7 @@ public static void main(String[] args) {
ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
JobConf conf = new JobConf(new YarnConfiguration());
// 加载job.xml信息
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
MRWebAppUtil.initialize(conf);

View File

@ -252,6 +252,7 @@ public void run() {
@SuppressWarnings("unchecked")
protected void handleJobSetup(CommitterJobSetupEvent event) {
try {
// 主要是创建attempt路径
committer.setupJob(event.getJobContext());
context.getEventHandler().handle(
new JobSetupCompletedEvent(event.getJobID()));

View File

@ -1455,6 +1455,7 @@ public JobStateInternal transition(JobImpl job, JobEvent event) {
}
try {
// 初始化token等信息
setup(job);
job.fs = job.getFileSystem(job.conf);

View File

@ -1669,12 +1669,14 @@ public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// 连接RM
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 提交作业
return submitter.submitJobInternal(Job.this, cluster);
}
});

View File

@ -213,6 +213,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application,
/**
* Assign containers to applications in the queue or it's children (if any).
* 分配资源包括父队列以及子队列
*
* @param clusterResource the resource of the cluster.
* @param candidates {@link CandidateNodeSet} the nodes that are considered
* for the current placement.

View File

@ -515,6 +515,13 @@ long getAsyncScheduleInterval() {
private final static Random random = new Random(System.currentTimeMillis());
/**
* 跳过已经由2次心跳周期没有上报的节点
* @param node
* @param cs
* @param printVerboseLog
* @return
*/
private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
CapacityScheduler cs, boolean printVerboseLog) {
// Skip node which missed 2 heartbeats since the node might be dead and
@ -557,7 +564,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
printedVerboseLoggingForAsyncScheduling = false;
}
// Allocate containers of node [start, end)
// Allocate containers of node [start, end) 从随机的节点开始分配可以保证分配均衡
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
@ -569,7 +576,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
current = 0;
// Allocate containers of node [0, start)
// Allocate containers of node [0, start) 从0到随机的节点
for (FiCaSchedulerNode node : nodes) {
if (current++ > start) {
break;
@ -1796,6 +1803,7 @@ CSAssignment allocateContainersToNode(
if (!multiNodePlacementEnabled) {
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
node.getNodeID());
// 申请资源到单个节点老版本逻辑
assignment = allocateContainerOnSingleNode(candidates,
node, withNodeHeartbeat);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
@ -1803,6 +1811,7 @@ CSAssignment allocateContainersToNode(
} else{
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
ActivitiesManager.EMPTY_NODE_ID);
// 申请资源到多个节点新逻辑
assignment = allocateContainersOnMultiNodes(candidates);
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
ActivitiesManager.EMPTY_NODE_ID, candidates.getPartition());