作业启动注释添加

This commit is contained in:
LingZhaoHui 2024-01-20 00:28:29 +08:00
parent b58553bb1e
commit aa9f3e1da4
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
4 changed files with 7 additions and 0 deletions

View File

@ -277,6 +277,7 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
while (shouldRetry) { while (shouldRetry) {
shouldRetry = false; shouldRetry = false;
try { try {
// 通过rpc调用namenode(或者routerrouter最终还是会调用到namnode)创建文件
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication, new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName, blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName,

View File

@ -42,6 +42,9 @@
* *
* This is responsible for sending edits as well as coordinating * This is responsible for sending edits as well as coordinating
* recovery of the nodes. * recovery of the nodes.
* 主要适用于Journal组件中处理edits log
* 1 接受edit log
* 2 发送edit给namenode方便恢复
*/ */
@KerberosInfo( @KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY, serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY,

View File

@ -785,6 +785,7 @@ public HdfsFileStatus create(String src, FsPermission masked,
String storagePolicy) String storagePolicy)
throws IOException { throws IOException {
checkNNStartup(); checkNNStartup();
// 获取客户端的ip
String clientMachine = getClientMachine(); String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) { if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file " stateChangeLog.debug("*DIR* NameNode.create: file "

View File

@ -1669,12 +1669,14 @@ public void submit()
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE); ensureState(JobState.DEFINE);
setUseNewAPI(); setUseNewAPI();
// 连接RM
connect(); connect();
final JobSubmitter submitter = final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException { ClassNotFoundException {
// 提交作业
return submitter.submitJobInternal(Job.this, cluster); return submitter.submitJobInternal(Job.this, cluster);
} }
}); });